Skip to content

Commit

Permalink
Fix attestation compatibility issue in distributed validator cluster (#…
Browse files Browse the repository at this point in the history
…5258)

* Allow to disable attestation service grouping optimization

* Refactor attestation service

* Add cli option to disable attestation service grouping optimization

* Add cli option to run as part of a distributed validator cluster

* Add unit test for running attestation tasks per committee

* Remove default value from cli options

* Catch errors during attestation routine to avoid unhandled promise rejections

---------

Co-authored-by: dapplion <35266934+dapplion@users.noreply.github.com>
  • Loading branch information
nflaig and dapplion authored Mar 23, 2023
1 parent 9cc3980 commit ac77c6f
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 21 deletions.
3 changes: 3 additions & 0 deletions packages/cli/src/cmds/validator/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ export async function validatorHandler(args: IValidatorCliArgs & GlobalArgs): Pr

const {version, commit} = getVersionData();
logger.info("Lodestar", {network, version, commit});
if (args.distributed) logger.info("Client is configured to run as part of a distributed validator cluster");
logger.info("Connecting to LevelDB database", {path: validatorPaths.validatorsDbDir});

const dbPath = validatorPaths.validatorsDbDir;
Expand Down Expand Up @@ -165,7 +166,9 @@ export async function validatorHandler(args: IValidatorCliArgs & GlobalArgs): Pr
doppelgangerProtectionEnabled,
afterBlockDelaySlotFraction: args.afterBlockDelaySlotFraction,
scAfterBlockDelaySlotFraction: args.scAfterBlockDelaySlotFraction,
disableAttestationGrouping: args.disableAttestationGrouping,
valProposerConfig,
distributed: args.distributed,
},
metrics
);
Expand Down
17 changes: 17 additions & 0 deletions packages/cli/src/cmds/validator/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export type IValidatorCliArgs = AccountValidatorArgs &
graffiti: string;
afterBlockDelaySlotFraction?: number;
scAfterBlockDelaySlotFraction?: number;
disableAttestationGrouping?: boolean;
suggestedFeeRecipient?: string;
proposerSettingsFile?: string;
strictFeeRecipientCheck?: boolean;
Expand All @@ -52,6 +53,8 @@ export type IValidatorCliArgs = AccountValidatorArgs &
"externalSigner.pubkeys"?: string[];
"externalSigner.fetch"?: boolean;

distributed?: boolean;

interopIndexes?: string;
fromMnemonic?: string;
mnemonicIndexes?: string;
Expand Down Expand Up @@ -189,6 +192,13 @@ export const validatorOptions: CliCommandOptions<IValidatorCliArgs> = {
type: "number",
},

disableAttestationGrouping: {
hidden: true,
description:
"Disables attestation service grouping optimization, attestation tasks will be executed per committee instead of just once for all committees.",
type: "boolean",
},

proposerSettingsFile: {
description:
"A yaml file to specify detailed default and per validator pubkey customized proposer configs. PS: This feature and its format is in alpha and subject to change",
Expand Down Expand Up @@ -277,6 +287,13 @@ export const validatorOptions: CliCommandOptions<IValidatorCliArgs> = {
group: "externalSignerUrl",
},

// Distributed validator

distributed: {
description: "Enables specific features required to run as part of a distributed validator cluster",
type: "boolean",
},

// Metrics

metrics: {
Expand Down
76 changes: 61 additions & 15 deletions packages/validator/src/services/attestation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ import {groupAttDutiesByCommitteeIndex} from "./utils.js";
import {ChainHeaderTracker} from "./chainHeaderTracker.js";
import {ValidatorEventEmitter} from "./emitter.js";

type AttestationServiceOpts = {
export type AttestationServiceOpts = {
afterBlockDelaySlotFraction?: number;
disableAttestationGrouping?: boolean;
};

/**
Expand Down Expand Up @@ -64,41 +65,86 @@ export class AttestationService {
await Promise.race([sleep(this.clock.msToSlot(slot + 1 / 3), signal), this.emitter.waitForBlockSlot(slot)]);
this.metrics?.attesterStepCallProduceAttestation.observe(this.clock.secFromSlot(slot + 1 / 3));

// Beacon node's endpoint produceAttestationData return data is not dependant on committeeIndex.
if (this.opts?.disableAttestationGrouping) {
// Attestation service grouping optimization must be disabled in a distributed validator cluster as
// middleware clients such as Charon (https://github.com/ObolNetwork/charon) expect the actual committee index
// to be sent to produceAttestationData endpoint. This is required because the middleware client itself
// calls produceAttestationData on the beacon node for each validator and there is a slight chance that
// the `beacon_block_root` (LMD GHOST vote) changes between calls which would cause a conflict between
// attestations submitted by Lodestar and other VCs in the cluster, resulting in aggregation failure.
// See https://github.com/ChainSafe/lodestar/issues/5103 for further details and references.
const dutiesByCommitteeIndex = groupAttDutiesByCommitteeIndex(duties);
await Promise.all(
Array.from(dutiesByCommitteeIndex.entries()).map(([index, duties]) =>
this.runAttestationTasksPerCommittee(duties, slot, index, signal).catch((e) => {
this.logger.error("Error on attestation routine", {slot, index}, e);
})
)
);
} else {
// Beacon node's endpoint produceAttestationData return data is not dependant on committeeIndex.
// Produce a single attestation for all committees and submit unaggregated attestations in one go.
await this.runAttestationTasksGrouped(duties, slot, signal);
}
};

private async runAttestationTasksPerCommittee(
dutiesSameCommittee: AttDutyAndProof[],
slot: Slot,
index: number,
signal: AbortSignal
): Promise<void> {
// Produce attestation with actual committee index
const attestation = await this.produceAttestation(index, slot);

// Step 1. Sign `Attestation` for each validator. Then publish all `Attestations` in one go
await this.signAndPublishAttestations(slot, attestation, dutiesSameCommittee);

// Step 2. after all attestations are submitted, make an aggregate.
// First, wait until the `aggregation_production_instant` (2/3rds of the way through the slot)
await sleep(this.clock.msToSlot(slot + 2 / 3), signal);
this.metrics?.attesterStepCallProduceAggregate.observe(this.clock.secFromSlot(slot + 2 / 3));

// Then download, sign and publish a `SignedAggregateAndProof` for each
// validator that is elected to aggregate for this `slot` and `committeeIndex`.
await this.produceAndPublishAggregates(attestation, dutiesSameCommittee);
}

private async runAttestationTasksGrouped(
dutiesAll: AttDutyAndProof[],
slot: Slot,
signal: AbortSignal
): Promise<void> {
// Produce a single attestation for all committees, and clone mutate before signing
// Downstream tooling may require that produceAttestation is called with a 'real' committee index
// So we pick the first duty's committee index - see https://github.com/ChainSafe/lodestar/issues/4687
const attestationNoCommittee = await this.produceAttestation(duties[0].duty.committeeIndex, slot);
const attestationNoCommittee = await this.produceAttestation(0, slot);

// Step 1. Mutate, and sign `Attestation` for each validator. Then publish all `Attestations` in one go
await this.signAndPublishAttestations(slot, attestationNoCommittee, duties);
await this.signAndPublishAttestations(slot, attestationNoCommittee, dutiesAll);

// Step 2. after all attestations are submitted, make an aggregate.
// First, wait until the `aggregation_production_instant` (2/3rds of the way though the slot)
// First, wait until the `aggregation_production_instant` (2/3rds of the way through the slot)
await sleep(this.clock.msToSlot(slot + 2 / 3), signal);
this.metrics?.attesterStepCallProduceAggregate.observe(this.clock.secFromSlot(slot + 2 / 3));

const dutiesByCommitteeIndex = groupAttDutiesByCommitteeIndex(this.dutiesService.getDutiesAtSlot(slot));
const dutiesByCommitteeIndex = groupAttDutiesByCommitteeIndex(dutiesAll);

// Then download, sign and publish a `SignedAggregateAndProof` for each
// validator that is elected to aggregate for this `slot` and
// `committeeIndex`.
// validator that is elected to aggregate for this `slot` and `committeeIndex`.
await Promise.all(
Array.from(dutiesByCommitteeIndex.entries()).map(([index, duties]) => {
Array.from(dutiesByCommitteeIndex.entries()).map(([index, dutiesSameCommittee]) => {
const attestationData: phase0.AttestationData = {...attestationNoCommittee, index};
return this.produceAndPublishAggregates(attestationData, duties);
return this.produceAndPublishAggregates(attestationData, dutiesSameCommittee);
})
);
};
}

/**
* Performs the first step of the attesting process: downloading one `Attestation` object.
* Beacon node's endpoint produceAttestationData return data is not dependant on committeeIndex.
* For a validator client with many validators this allows to do a single call for all committees
* in a slot, saving resources in both the vc and beacon node
*
* A committee index is still passed in for the benefit of downstream tooling -
* see https://github.com/ChainSafe/lodestar/issues/4687
* Note: the actual committeeIndex must be passed in if attestation grouping is disabled
*/
private async produceAttestation(committeeIndex: number, slot: Slot): Promise<phase0.AttestationData> {
// Produce one attestation data per slot and committeeIndex
Expand Down
7 changes: 6 additions & 1 deletion packages/validator/src/validator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ export type ValidatorOptions = {
abortController: AbortController;
afterBlockDelaySlotFraction?: number;
scAfterBlockDelaySlotFraction?: number;
disableAttestationGrouping?: boolean;
doppelgangerProtectionEnabled?: boolean;
closed?: boolean;
valProposerConfig?: ValidatorProposerConfig;
distributed?: boolean;
};

// TODO: Extend the timeout, and let it be customizable
Expand Down Expand Up @@ -123,7 +125,10 @@ export class Validator {
emitter,
chainHeaderTracker,
metrics,
{afterBlockDelaySlotFraction: opts.afterBlockDelaySlotFraction}
{
afterBlockDelaySlotFraction: opts.afterBlockDelaySlotFraction,
disableAttestationGrouping: opts.disableAttestationGrouping || opts.distributed,
}
);

this.syncCommitteeService = new SyncCommitteeService(
Expand Down
30 changes: 25 additions & 5 deletions packages/validator/test/unit/services/attestation.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import bls from "@chainsafe/bls";
import {toHexString} from "@chainsafe/ssz";
import {ssz} from "@lodestar/types";
import {HttpStatusCode} from "@lodestar/api";
import {AttestationService} from "../../../src/services/attestation.js";
import {AttestationService, AttestationServiceOpts} from "../../../src/services/attestation.js";
import {AttDutyAndProof} from "../../../src/services/attestationDuties.js";
import {ValidatorStore} from "../../../src/services/validatorStore.js";
import {getApiClientStub} from "../../utils/apiStub.js";
Expand Down Expand Up @@ -37,9 +37,28 @@ describe("AttestationService", function () {

let controller: AbortController; // To stop clock
beforeEach(() => (controller = new AbortController()));
afterEach(() => controller.abort());
afterEach(() => {
controller.abort();
sandbox.resetHistory();
});

context("With attestation grouping enabled", () => {
const opts: AttestationServiceOpts = {disableAttestationGrouping: false};

it("Should produce, sign, and publish an attestation + aggregate", async () => {
await testAttestationTasks(opts);
});
});

it("Should produce, sign, and publish an attestation + aggregate", async () => {
context("With attestation grouping disabled", () => {
const opts: AttestationServiceOpts = {disableAttestationGrouping: true};

it("Should produce, sign, and publish an attestation + aggregate", async () => {
await testAttestationTasks(opts);
});
});

async function testAttestationTasks(opts?: AttestationServiceOpts): Promise<void> {
const clock = new ClockMock();
const attestationService = new AttestationService(
loggerVc,
Expand All @@ -48,7 +67,8 @@ describe("AttestationService", function () {
validatorStore,
emitter,
chainHeadTracker,
null
null,
opts
);

const attestation = ssz.phase0.Attestation.defaultValue();
Expand Down Expand Up @@ -121,5 +141,5 @@ describe("AttestationService", function () {
[[aggregate]], // 1 arg, = aggregate[]
"wrong publishAggregateAndProofs() args"
);
});
}
});

0 comments on commit ac77c6f

Please sign in to comment.