Skip to content

Commit

Permalink
Fix attestation compatibility issue in distributed validator cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
nflaig committed Mar 10, 2023
1 parent 4d89a0e commit 2c323b6
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 31 deletions.
1 change: 1 addition & 0 deletions packages/cli/src/cmds/validator/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ export async function validatorHandler(args: IValidatorCliArgs & GlobalArgs): Pr
afterBlockDelaySlotFraction: args.afterBlockDelaySlotFraction,
scAfterBlockDelaySlotFraction: args.scAfterBlockDelaySlotFraction,
valProposerConfig,
distributed: args.distributed,
},
metrics
);
Expand Down
10 changes: 10 additions & 0 deletions packages/cli/src/cmds/validator/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ export type IValidatorCliArgs = AccountValidatorArgs &
"externalSigner.pubkeys"?: string[];
"externalSigner.fetch"?: boolean;

distributed?: boolean;

interopIndexes?: string;
fromMnemonic?: string;
mnemonicIndexes?: string;
Expand Down Expand Up @@ -277,6 +279,14 @@ export const validatorOptions: CliCommandOptions<IValidatorCliArgs> = {
group: "externalSignerUrl",
},

// Distributed validator

distributed: {
description: "Enables specific features required to run as part of a distributed validator cluster",
default: false,
type: "boolean",
},

// Metrics

metrics: {
Expand Down
69 changes: 42 additions & 27 deletions packages/validator/src/services/attestation.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {phase0, Slot, ssz} from "@lodestar/types";
import {computeEpochAtSlot} from "@lodestar/state-transition";
import {sleep} from "@lodestar/utils";
import {Api, ApiError} from "@lodestar/api";
import {Api, ApiError, routes} from "@lodestar/api";
import {toHexString} from "@chainsafe/ssz";
import {IClock, LoggerVc} from "../util/index.js";
import {PubkeyHex} from "../types.js";
Expand All @@ -16,6 +16,8 @@ type AttestationServiceOpts = {
afterBlockDelaySlotFraction?: number;
};

export type AttestationServiceArgs = ConstructorParameters<typeof AttestationService>;

/**
* Previously, submitting attestations too early may cause some attestations missed (because some clients may not queue attestations, and
* sent peers are few) so it was configured as 1/6. See https://github.com/ChainSafe/lodestar/issues/3943
Expand All @@ -29,17 +31,17 @@ const DEFAULT_AFTER_BLOCK_DELAY_SLOT_FRACTION = 0;
* Service that sets up and handles validator attester duties.
*/
export class AttestationService {
private readonly dutiesService: AttestationDutiesService;
protected readonly dutiesService: AttestationDutiesService;

constructor(
private readonly logger: LoggerVc,
private readonly api: Api,
private readonly clock: IClock,
private readonly validatorStore: ValidatorStore,
private readonly emitter: ValidatorEventEmitter,
protected readonly logger: LoggerVc,
protected readonly api: Api,
protected readonly clock: IClock,
protected readonly validatorStore: ValidatorStore,
protected readonly emitter: ValidatorEventEmitter,
chainHeadTracker: ChainHeaderTracker,
private readonly metrics: Metrics | null,
private readonly opts?: AttestationServiceOpts
protected readonly metrics: Metrics | null,
protected readonly opts?: AttestationServiceOpts
) {
this.dutiesService = new AttestationDutiesService(logger, api, clock, validatorStore, chainHeadTracker, metrics);

Expand All @@ -51,7 +53,7 @@ export class AttestationService {
this.dutiesService.removeDutiesForKey(pubkey);
}

private runAttestationTasks = async (slot: Slot, signal: AbortSignal): Promise<void> => {
protected runAttestationTasks = async (slot: Slot, signal: AbortSignal): Promise<void> => {
// Fetch info first so a potential delay is absorbed by the sleep() below
const duties = this.dutiesService.getDutiesAtSlot(slot);
if (duties.length === 0) {
Expand All @@ -66,9 +68,7 @@ export class AttestationService {

// Beacon node's endpoint produceAttestationData return data is not dependant on committeeIndex.
// Produce a single attestation for all committees, and clone mutate before signing
// Downstream tooling may require that produceAttestation is called with a 'real' committee index
// So we pick the first duty's committee index - see https://github.com/ChainSafe/lodestar/issues/4687
const attestationNoCommittee = await this.produceAttestation(duties[0].duty.committeeIndex, slot);
const attestationNoCommittee = await this.produceAttestation(slot);

// Step 1. Mutate, and sign `Attestation` for each validator. Then publish all `Attestations` in one go
await this.signAndPublishAttestations(slot, attestationNoCommittee, duties);
Expand Down Expand Up @@ -96,11 +96,8 @@ export class AttestationService {
* Beacon node's endpoint produceAttestationData return data is not dependant on committeeIndex.
* For a validator client with many validators this allows to do a single call for all committees
* in a slot, saving resources in both the vc and beacon node
*
* A committee index is still passed in for the benefit of downstream tooling -
* see https://github.com/ChainSafe/lodestar/issues/4687
*/
private async produceAttestation(committeeIndex: number, slot: Slot): Promise<phase0.AttestationData> {
protected async produceAttestation(slot: Slot, committeeIndex = 0): Promise<phase0.AttestationData> {
// Produce one attestation data per slot and committeeIndex
const res = await this.api.validator.produceAttestationData(committeeIndex, slot);
ApiError.assert(res, "Error producing attestation");
Expand All @@ -111,7 +108,7 @@ export class AttestationService {
* Only one `Attestation` is downloaded from the BN. It is then signed by each
* validator and the list of individually-signed `Attestation` objects is returned to the BN.
*/
private async signAndPublishAttestations(
protected async signAndPublishAttestations(
slot: Slot,
attestationNoCommittee: phase0.AttestationData,
duties: AttDutyAndProof[]
Expand All @@ -124,18 +121,36 @@ export class AttestationService {
duties.map(async ({duty}) => {
const index = duty.committeeIndex;
const attestationData: phase0.AttestationData = {...attestationNoCommittee, index};
const logCtxValidator = {slot, index, head: headRootHex, validatorIndex: duty.validatorIndex};

try {
signedAttestations.push(await this.validatorStore.signAttestation(duty, attestationData, currentEpoch));
this.logger.debug("Signed attestation", logCtxValidator);
} catch (e) {
this.metrics?.attestaterError.inc({error: "sign"});
this.logger.error("Error signing attestation", logCtxValidator, e as Error);
}
const signedAttestation = await this.signAttestation(duty, attestationData, currentEpoch, slot, headRootHex);
if (signedAttestation) signedAttestations.push(signedAttestation);
})
);

await this.publishAttestations(slot, signedAttestations);
}

protected async signAttestation(
duty: routes.validator.AttesterDuty,
attestationData: phase0.AttestationData,
currentEpoch: number,
slot: Slot,
headRootHex: string
): Promise<phase0.Attestation | null> {
const logCtxValidator = {slot, index: duty.committeeIndex, head: headRootHex, validatorIndex: duty.validatorIndex};

try {
const signedAttestation = await this.validatorStore.signAttestation(duty, attestationData, currentEpoch);
this.logger.debug("Signed attestation", logCtxValidator);
return signedAttestation;
} catch (e) {
this.metrics?.attestaterError.inc({error: "sign"});
this.logger.error("Error signing attestation", logCtxValidator, e as Error);
return null;
}
}

protected async publishAttestations(slot: Slot, signedAttestations: phase0.Attestation[]): Promise<void> {
// signAndPublishAttestations() may be called before the 1/3 cutoff time if the block was received early.
// If we produced the block or we got the block sooner than our peers, our attestations can be dropped because
// they reach our peers before the block. To prevent that, we wait 2 extra seconds AFTER block arrival, but
Expand Down Expand Up @@ -173,7 +188,7 @@ export class AttestationService {
* by each validator and the list of individually-signed `SignedAggregateAndProof` objects is
* returned to the BN.
*/
private async produceAndPublishAggregates(
protected async produceAndPublishAggregates(
attestation: phase0.AttestationData,
duties: AttDutyAndProof[]
): Promise<void> {
Expand Down
65 changes: 65 additions & 0 deletions packages/validator/src/services/distributedAttestation.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import {toHexString} from "@chainsafe/ssz";
import {computeEpochAtSlot} from "@lodestar/state-transition";
import {phase0, Slot} from "@lodestar/types";
import {sleep} from "@lodestar/utils";
import {AttestationService, AttestationServiceArgs} from "./attestation.js";
import {groupAttDutiesByCommitteeIndex} from "./utils.js";

export class DistributedAttestationService extends AttestationService {
constructor(...args: AttestationServiceArgs) {
super(...args);
}

protected runAttestationTasks = async (slot: Slot, signal: AbortSignal): Promise<void> => {
// Fetch info first so a potential delay is absorbed by the sleep() below
const duties = this.dutiesService.getDutiesAtSlot(slot);
if (duties.length === 0) {
return;
}

// A validator should create and broadcast the attestation to the associated attestation subnet when either
// (a) the validator has received a valid block from the expected block proposer for the assigned slot or
// (b) one-third of the slot has transpired (SECONDS_PER_SLOT / 3 seconds after the start of slot) -- whichever comes first.
await Promise.race([sleep(this.clock.msToSlot(slot + 1 / 3), signal), this.emitter.waitForBlockSlot(slot)]);
this.metrics?.attesterStepCallProduceAttestation.observe(this.clock.secFromSlot(slot + 1 / 3));

const dutiesByCommitteeIndex = groupAttDutiesByCommitteeIndex(duties);

// Step 1. Sign `Attestation` for each validator
const signedAttestations: phase0.Attestation[] = [];

await Promise.all(
Array.from(dutiesByCommitteeIndex.entries()).map(async ([index, duties]) => {
const attestationData = await this.produceAttestation(slot, index);
const headRootHex = toHexString(attestationData.beaconBlockRoot);
const currentEpoch = computeEpochAtSlot(slot);

duties.map(async ({duty}) => {
const signedAttestation = await this.signAttestation(duty, attestationData, currentEpoch, slot, headRootHex);
if (signedAttestation) signedAttestations.push(signedAttestation);
});
})
);

// Step 2. Publish all `Attestations` in one go
await this.publishAttestations(slot, signedAttestations);

This comment has been minimized.

Copy link
@nflaig

nflaig Mar 13, 2023

Author Member

@dapplion one difference between my implementation and the one you proposed in this commit is that here we only call submitPoolAttestations once for all committees and not for each committee.

However, Lighthouse also calls submitPoolAttestations per committee and it is probably fine to do this either way.

This comment has been minimized.

Copy link
@dapplion

dapplion Mar 14, 2023

Contributor

It has to be that way, otherwise you may be delaying operations. If committee X promise takes long to produce an attestation it would delay the calls for all other committees. That's why the original optimization was implemented


// Step 3. after all attestations are submitted, make an aggregate.
// First, wait until the `aggregation_production_instant` (2/3rds of the way though the slot)
await sleep(this.clock.msToSlot(slot + 2 / 3), signal);
this.metrics?.attesterStepCallProduceAggregate.observe(this.clock.secFromSlot(slot + 2 / 3));

// Then download, sign and publish a `SignedAggregateAndProof` for each
// validator that is elected to aggregate for this `slot` and `committeeIndex`.
await Promise.all(
Array.from(dutiesByCommitteeIndex.entries()).map(async ([index, duties]) => {
const attestation = signedAttestations.find((a) => a.data.index === index);
if (attestation) {
await this.produceAndPublishAggregates(attestation.data, duties);
} else {
this.logger.error("Error finding attestation to produce aggregate", {slot, index});
}
})
);
};
}
13 changes: 9 additions & 4 deletions packages/validator/src/validator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import {computeEpochAtSlot, getCurrentSlot} from "@lodestar/state-transition";
import {Clock, IClock} from "./util/clock.js";
import {waitForGenesis} from "./genesis.js";
import {BlockProposingService} from "./services/block.js";
import {AttestationService} from "./services/attestation.js";
import {AttestationService, AttestationServiceArgs} from "./services/attestation.js";
import {DistributedAttestationService} from "./services/distributedAttestation.js";
import {IndicesService} from "./services/indices.js";
import {SyncCommitteeService} from "./services/syncCommittee.js";
import {pollPrepareBeaconProposer, pollBuilderValidatorRegistration} from "./services/prepareBeaconProposer.js";
Expand All @@ -36,6 +37,7 @@ export type ValidatorOptions = {
doppelgangerProtectionEnabled?: boolean;
closed?: boolean;
valProposerConfig?: ValidatorProposerConfig;
distributed?: boolean;
};

// TODO: Extend the timeout, and let it be customizable
Expand Down Expand Up @@ -115,16 +117,19 @@ export class Validator {

this.blockProposingService = new BlockProposingService(config, loggerVc, api, clock, validatorStore, metrics);

this.attestationService = new AttestationService(
const attestationServiceArgs: AttestationServiceArgs = [
loggerVc,
api,
clock,
validatorStore,
emitter,
chainHeaderTracker,
metrics,
{afterBlockDelaySlotFraction: opts.afterBlockDelaySlotFraction}
);
{afterBlockDelaySlotFraction: opts.afterBlockDelaySlotFraction},
];
this.attestationService = opts.distributed
? new DistributedAttestationService(...attestationServiceArgs)
: new AttestationService(...attestationServiceArgs);

this.syncCommitteeService = new SyncCommitteeService(
config,
Expand Down

0 comments on commit 2c323b6

Please sign in to comment.