Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix attestation compatibility issue in distributed validator cluster #5258

Merged
merged 7 commits into from
Mar 23, 2023
3 changes: 3 additions & 0 deletions packages/cli/src/cmds/validator/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ export async function validatorHandler(args: IValidatorCliArgs & GlobalArgs): Pr

const {version, commit} = getVersionData();
logger.info("Lodestar", {network, version, commit});
if (args.distributed) logger.info("Client is configured to run as part of a distributed validator cluster");
dapplion marked this conversation as resolved.
Show resolved Hide resolved
logger.info("Connecting to LevelDB database", {path: validatorPaths.validatorsDbDir});

const dbPath = validatorPaths.validatorsDbDir;
Expand Down Expand Up @@ -165,7 +166,9 @@ export async function validatorHandler(args: IValidatorCliArgs & GlobalArgs): Pr
doppelgangerProtectionEnabled,
afterBlockDelaySlotFraction: args.afterBlockDelaySlotFraction,
scAfterBlockDelaySlotFraction: args.scAfterBlockDelaySlotFraction,
disableAttestationGrouping: args.disableAttestationGrouping,
dapplion marked this conversation as resolved.
Show resolved Hide resolved
valProposerConfig,
distributed: args.distributed,
},
metrics
);
Expand Down
17 changes: 17 additions & 0 deletions packages/cli/src/cmds/validator/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export type IValidatorCliArgs = AccountValidatorArgs &
graffiti: string;
afterBlockDelaySlotFraction?: number;
scAfterBlockDelaySlotFraction?: number;
disableAttestationGrouping?: boolean;
suggestedFeeRecipient?: string;
proposerSettingsFile?: string;
strictFeeRecipientCheck?: boolean;
Expand All @@ -52,6 +53,8 @@ export type IValidatorCliArgs = AccountValidatorArgs &
"externalSigner.pubkeys"?: string[];
"externalSigner.fetch"?: boolean;

distributed?: boolean;

interopIndexes?: string;
fromMnemonic?: string;
mnemonicIndexes?: string;
Expand Down Expand Up @@ -189,6 +192,13 @@ export const validatorOptions: CliCommandOptions<IValidatorCliArgs> = {
type: "number",
},

disableAttestationGrouping: {
nflaig marked this conversation as resolved.
Show resolved Hide resolved
hidden: true,
description:
"Disables attestation service grouping optimization, attestation tasks will be executed per committee instead of just once for all committees.",
type: "boolean",
},

proposerSettingsFile: {
description:
"A yaml file to specify detailed default and per validator pubkey customized proposer configs. PS: This feature and its format is in alpha and subject to change",
Expand Down Expand Up @@ -277,6 +287,13 @@ export const validatorOptions: CliCommandOptions<IValidatorCliArgs> = {
group: "externalSignerUrl",
},

// Distributed validator

distributed: {
description: "Enables specific features required to run as part of a distributed validator cluster",
type: "boolean",
},

// Metrics

metrics: {
Expand Down
76 changes: 61 additions & 15 deletions packages/validator/src/services/attestation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ import {groupAttDutiesByCommitteeIndex} from "./utils.js";
import {ChainHeaderTracker} from "./chainHeaderTracker.js";
import {ValidatorEventEmitter} from "./emitter.js";

type AttestationServiceOpts = {
export type AttestationServiceOpts = {
afterBlockDelaySlotFraction?: number;
disableAttestationGrouping?: boolean;
};

/**
Expand Down Expand Up @@ -64,41 +65,86 @@ export class AttestationService {
await Promise.race([sleep(this.clock.msToSlot(slot + 1 / 3), signal), this.emitter.waitForBlockSlot(slot)]);
this.metrics?.attesterStepCallProduceAttestation.observe(this.clock.secFromSlot(slot + 1 / 3));

// Beacon node's endpoint produceAttestationData return data is not dependant on committeeIndex.
if (this.opts?.disableAttestationGrouping) {
// Attestation service grouping optimization must be disabled in a distributed validator cluster as
// middleware clients such as Charon (https://github.com/ObolNetwork/charon) expect the actual committee index
// to be sent to produceAttestationData endpoint. This is required because the middleware client itself
// calls produceAttestationData on the beacon node for each validator and there is a slight chance that
// the `beacon_block_root` (LMD GHOST vote) changes between calls which would cause a conflict between
// attestations submitted by Lodestar and other VCs in the cluster, resulting in aggregation failure.
// See https://github.com/ChainSafe/lodestar/issues/5103 for further details and references.
const dutiesByCommitteeIndex = groupAttDutiesByCommitteeIndex(duties);
await Promise.all(
Array.from(dutiesByCommitteeIndex.entries()).map(([index, duties]) =>
this.runAttestationTasksPerCommittee(duties, slot, index, signal).catch((e) => {
this.logger.error("Error on attestation routine", {slot, index}, e);
nflaig marked this conversation as resolved.
Show resolved Hide resolved
})
)
);
} else {
// Beacon node's endpoint produceAttestationData return data is not dependant on committeeIndex.
// Produce a single attestation for all committees and submit unaggregated attestations in one go.
await this.runAttestationTasksGrouped(duties, slot, signal);
}
};

private async runAttestationTasksPerCommittee(
dutiesSameCommittee: AttDutyAndProof[],
slot: Slot,
index: number,
signal: AbortSignal
): Promise<void> {
// Produce attestation with actual committee index
const attestation = await this.produceAttestation(index, slot);

// Step 1. Sign `Attestation` for each validator. Then publish all `Attestations` in one go
await this.signAndPublishAttestations(slot, attestation, dutiesSameCommittee);

// Step 2. after all attestations are submitted, make an aggregate.
// First, wait until the `aggregation_production_instant` (2/3rds of the way through the slot)
await sleep(this.clock.msToSlot(slot + 2 / 3), signal);
nflaig marked this conversation as resolved.
Show resolved Hide resolved
this.metrics?.attesterStepCallProduceAggregate.observe(this.clock.secFromSlot(slot + 2 / 3));

// Then download, sign and publish a `SignedAggregateAndProof` for each
dapplion marked this conversation as resolved.
Show resolved Hide resolved
// validator that is elected to aggregate for this `slot` and `committeeIndex`.
await this.produceAndPublishAggregates(attestation, dutiesSameCommittee);
}

private async runAttestationTasksGrouped(
dutiesAll: AttDutyAndProof[],
slot: Slot,
signal: AbortSignal
): Promise<void> {
// Produce a single attestation for all committees, and clone mutate before signing
// Downstream tooling may require that produceAttestation is called with a 'real' committee index
// So we pick the first duty's committee index - see https://github.com/ChainSafe/lodestar/issues/4687
const attestationNoCommittee = await this.produceAttestation(duties[0].duty.committeeIndex, slot);
const attestationNoCommittee = await this.produceAttestation(0, slot);
dapplion marked this conversation as resolved.
Show resolved Hide resolved

// Step 1. Mutate, and sign `Attestation` for each validator. Then publish all `Attestations` in one go
await this.signAndPublishAttestations(slot, attestationNoCommittee, duties);
await this.signAndPublishAttestations(slot, attestationNoCommittee, dutiesAll);

// Step 2. after all attestations are submitted, make an aggregate.
// First, wait until the `aggregation_production_instant` (2/3rds of the way though the slot)
// First, wait until the `aggregation_production_instant` (2/3rds of the way through the slot)
await sleep(this.clock.msToSlot(slot + 2 / 3), signal);
this.metrics?.attesterStepCallProduceAggregate.observe(this.clock.secFromSlot(slot + 2 / 3));

const dutiesByCommitteeIndex = groupAttDutiesByCommitteeIndex(this.dutiesService.getDutiesAtSlot(slot));
const dutiesByCommitteeIndex = groupAttDutiesByCommitteeIndex(dutiesAll);
dapplion marked this conversation as resolved.
Show resolved Hide resolved

// Then download, sign and publish a `SignedAggregateAndProof` for each
// validator that is elected to aggregate for this `slot` and
// `committeeIndex`.
// validator that is elected to aggregate for this `slot` and `committeeIndex`.
await Promise.all(
Array.from(dutiesByCommitteeIndex.entries()).map(([index, duties]) => {
Array.from(dutiesByCommitteeIndex.entries()).map(([index, dutiesSameCommittee]) => {
const attestationData: phase0.AttestationData = {...attestationNoCommittee, index};
return this.produceAndPublishAggregates(attestationData, duties);
return this.produceAndPublishAggregates(attestationData, dutiesSameCommittee);
})
);
};
}

/**
* Performs the first step of the attesting process: downloading one `Attestation` object.
* Beacon node's endpoint produceAttestationData return data is not dependant on committeeIndex.
* For a validator client with many validators this allows to do a single call for all committees
* in a slot, saving resources in both the vc and beacon node
*
* A committee index is still passed in for the benefit of downstream tooling -
* see https://github.com/ChainSafe/lodestar/issues/4687
* Note: the actual committeeIndex must be passed in if attestation grouping is disabled
*/
private async produceAttestation(committeeIndex: number, slot: Slot): Promise<phase0.AttestationData> {
// Produce one attestation data per slot and committeeIndex
Expand Down
7 changes: 6 additions & 1 deletion packages/validator/src/validator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ export type ValidatorOptions = {
abortController: AbortController;
afterBlockDelaySlotFraction?: number;
scAfterBlockDelaySlotFraction?: number;
disableAttestationGrouping?: boolean;
doppelgangerProtectionEnabled?: boolean;
closed?: boolean;
valProposerConfig?: ValidatorProposerConfig;
distributed?: boolean;
};

// TODO: Extend the timeout, and let it be customizable
Expand Down Expand Up @@ -123,7 +125,10 @@ export class Validator {
emitter,
chainHeaderTracker,
metrics,
{afterBlockDelaySlotFraction: opts.afterBlockDelaySlotFraction}
{
afterBlockDelaySlotFraction: opts.afterBlockDelaySlotFraction,
disableAttestationGrouping: opts.disableAttestationGrouping || opts.distributed,
}
);

this.syncCommitteeService = new SyncCommitteeService(
Expand Down
30 changes: 25 additions & 5 deletions packages/validator/test/unit/services/attestation.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import bls from "@chainsafe/bls";
import {toHexString} from "@chainsafe/ssz";
import {ssz} from "@lodestar/types";
import {HttpStatusCode} from "@lodestar/api";
import {AttestationService} from "../../../src/services/attestation.js";
import {AttestationService, AttestationServiceOpts} from "../../../src/services/attestation.js";
import {AttDutyAndProof} from "../../../src/services/attestationDuties.js";
import {ValidatorStore} from "../../../src/services/validatorStore.js";
import {getApiClientStub} from "../../utils/apiStub.js";
Expand Down Expand Up @@ -37,9 +37,28 @@ describe("AttestationService", function () {

let controller: AbortController; // To stop clock
beforeEach(() => (controller = new AbortController()));
afterEach(() => controller.abort());
afterEach(() => {
controller.abort();
sandbox.resetHistory();
});

context("With attestation grouping enabled", () => {
const opts: AttestationServiceOpts = {disableAttestationGrouping: false};

it("Should produce, sign, and publish an attestation + aggregate", async () => {
await testAttestationTasks(opts);
});
});

it("Should produce, sign, and publish an attestation + aggregate", async () => {
context("With attestation grouping disabled", () => {
const opts: AttestationServiceOpts = {disableAttestationGrouping: true};

it("Should produce, sign, and publish an attestation + aggregate", async () => {
await testAttestationTasks(opts);
});
});

async function testAttestationTasks(opts?: AttestationServiceOpts): Promise<void> {
const clock = new ClockMock();
const attestationService = new AttestationService(
loggerVc,
Expand All @@ -48,7 +67,8 @@ describe("AttestationService", function () {
validatorStore,
emitter,
chainHeadTracker,
null
null,
opts
);

const attestation = ssz.phase0.Attestation.defaultValue();
Expand Down Expand Up @@ -121,5 +141,5 @@ describe("AttestationService", function () {
[[aggregate]], // 1 arg, = aggregate[]
"wrong publishAggregateAndProofs() args"
);
});
}
});