Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
nflaig committed Mar 10, 2023
1 parent 4d89a0e commit 6deb360
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 31 deletions.
1 change: 1 addition & 0 deletions packages/cli/src/cmds/validator/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ export async function validatorHandler(args: IValidatorCliArgs & GlobalArgs): Pr
afterBlockDelaySlotFraction: args.afterBlockDelaySlotFraction,
scAfterBlockDelaySlotFraction: args.scAfterBlockDelaySlotFraction,
valProposerConfig,
distributed: args.distributed,
},
metrics
);
Expand Down
10 changes: 10 additions & 0 deletions packages/cli/src/cmds/validator/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ export type IValidatorCliArgs = AccountValidatorArgs &
"externalSigner.pubkeys"?: string[];
"externalSigner.fetch"?: boolean;

distributed?: boolean;

interopIndexes?: string;
fromMnemonic?: string;
mnemonicIndexes?: string;
Expand Down Expand Up @@ -277,6 +279,14 @@ export const validatorOptions: CliCommandOptions<IValidatorCliArgs> = {
group: "externalSignerUrl",
},

// Distributed validator

distributed: {
description: "Enables specific features required to run as part of a distributed validator cluster",
default: false,
type: "boolean",
},

// Metrics

metrics: {
Expand Down
69 changes: 42 additions & 27 deletions packages/validator/src/services/attestation.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {phase0, Slot, ssz} from "@lodestar/types";
import {computeEpochAtSlot} from "@lodestar/state-transition";
import {sleep} from "@lodestar/utils";
import {Api, ApiError} from "@lodestar/api";
import {Api, ApiError, routes} from "@lodestar/api";
import {toHexString} from "@chainsafe/ssz";
import {IClock, LoggerVc} from "../util/index.js";
import {PubkeyHex} from "../types.js";
Expand All @@ -16,6 +16,8 @@ type AttestationServiceOpts = {
afterBlockDelaySlotFraction?: number;
};

export type AttestationServiceArgs = ConstructorParameters<typeof AttestationService>;

/**
* Previously, submitting attestations too early may cause some attestations missed (because some clients may not queue attestations, and
* sent peers are few) so it was configured as 1/6. See https://github.com/ChainSafe/lodestar/issues/3943
Expand All @@ -29,17 +31,17 @@ const DEFAULT_AFTER_BLOCK_DELAY_SLOT_FRACTION = 0;
* Service that sets up and handles validator attester duties.
*/
export class AttestationService {
private readonly dutiesService: AttestationDutiesService;
protected readonly dutiesService: AttestationDutiesService;

constructor(
private readonly logger: LoggerVc,
private readonly api: Api,
private readonly clock: IClock,
private readonly validatorStore: ValidatorStore,
private readonly emitter: ValidatorEventEmitter,
protected readonly logger: LoggerVc,
protected readonly api: Api,
protected readonly clock: IClock,
protected readonly validatorStore: ValidatorStore,
protected readonly emitter: ValidatorEventEmitter,
chainHeadTracker: ChainHeaderTracker,
private readonly metrics: Metrics | null,
private readonly opts?: AttestationServiceOpts
protected readonly metrics: Metrics | null,
protected readonly opts?: AttestationServiceOpts
) {
this.dutiesService = new AttestationDutiesService(logger, api, clock, validatorStore, chainHeadTracker, metrics);

Expand All @@ -51,7 +53,7 @@ export class AttestationService {
this.dutiesService.removeDutiesForKey(pubkey);
}

private runAttestationTasks = async (slot: Slot, signal: AbortSignal): Promise<void> => {
protected runAttestationTasks = async (slot: Slot, signal: AbortSignal): Promise<void> => {
// Fetch info first so a potential delay is absorbed by the sleep() below
const duties = this.dutiesService.getDutiesAtSlot(slot);
if (duties.length === 0) {
Expand All @@ -66,9 +68,7 @@ export class AttestationService {

// Beacon node's endpoint produceAttestationData return data is not dependant on committeeIndex.
// Produce a single attestation for all committees, and clone mutate before signing
// Downstream tooling may require that produceAttestation is called with a 'real' committee index
// So we pick the first duty's committee index - see https://github.com/ChainSafe/lodestar/issues/4687
const attestationNoCommittee = await this.produceAttestation(duties[0].duty.committeeIndex, slot);
const attestationNoCommittee = await this.produceAttestation(slot);

// Step 1. Mutate, and sign `Attestation` for each validator. Then publish all `Attestations` in one go
await this.signAndPublishAttestations(slot, attestationNoCommittee, duties);
Expand Down Expand Up @@ -96,11 +96,8 @@ export class AttestationService {
* Beacon node's endpoint produceAttestationData return data is not dependant on committeeIndex.
* For a validator client with many validators this allows to do a single call for all committees
* in a slot, saving resources in both the vc and beacon node
*
* A committee index is still passed in for the benefit of downstream tooling -
* see https://github.com/ChainSafe/lodestar/issues/4687
*/
private async produceAttestation(committeeIndex: number, slot: Slot): Promise<phase0.AttestationData> {
protected async produceAttestation(slot: Slot, committeeIndex = 0): Promise<phase0.AttestationData> {
// Produce one attestation data per slot and committeeIndex
const res = await this.api.validator.produceAttestationData(committeeIndex, slot);
ApiError.assert(res, "Error producing attestation");
Expand All @@ -111,7 +108,7 @@ export class AttestationService {
* Only one `Attestation` is downloaded from the BN. It is then signed by each
* validator and the list of individually-signed `Attestation` objects is returned to the BN.
*/
private async signAndPublishAttestations(
protected async signAndPublishAttestations(
slot: Slot,
attestationNoCommittee: phase0.AttestationData,
duties: AttDutyAndProof[]
Expand All @@ -124,18 +121,36 @@ export class AttestationService {
duties.map(async ({duty}) => {
const index = duty.committeeIndex;
const attestationData: phase0.AttestationData = {...attestationNoCommittee, index};
const logCtxValidator = {slot, index, head: headRootHex, validatorIndex: duty.validatorIndex};

try {
signedAttestations.push(await this.validatorStore.signAttestation(duty, attestationData, currentEpoch));
this.logger.debug("Signed attestation", logCtxValidator);
} catch (e) {
this.metrics?.attestaterError.inc({error: "sign"});
this.logger.error("Error signing attestation", logCtxValidator, e as Error);
}
const signedAttestation = await this.signAttestation(duty, attestationData, currentEpoch, slot, headRootHex);
if (signedAttestation) signedAttestations.push(signedAttestation);
})
);

await this.publishAttestations(slot, signedAttestations);
}

protected async signAttestation(
duty: routes.validator.AttesterDuty,
attestationData: phase0.AttestationData,
currentEpoch: number,
slot: Slot,
headRootHex: string
): Promise<phase0.Attestation | null> {
const logCtxValidator = {slot, index: duty.committeeIndex, head: headRootHex, validatorIndex: duty.validatorIndex};

try {
const signedAttestation = await this.validatorStore.signAttestation(duty, attestationData, currentEpoch);
this.logger.debug("Signed attestation", logCtxValidator);
return signedAttestation;
} catch (e) {
this.metrics?.attestaterError.inc({error: "sign"});
this.logger.error("Error signing attestation", logCtxValidator, e as Error);
return null;
}
}

protected async publishAttestations(slot: Slot, signedAttestations: phase0.Attestation[]): Promise<void> {
// signAndPublishAttestations() may be called before the 1/3 cutoff time if the block was received early.
// If we produced the block or we got the block sooner than our peers, our attestations can be dropped because
// they reach our peers before the block. To prevent that, we wait 2 extra seconds AFTER block arrival, but
Expand Down Expand Up @@ -173,7 +188,7 @@ export class AttestationService {
* by each validator and the list of individually-signed `SignedAggregateAndProof` objects is
* returned to the BN.
*/
private async produceAndPublishAggregates(
protected async produceAndPublishAggregates(
attestation: phase0.AttestationData,
duties: AttDutyAndProof[]
): Promise<void> {
Expand Down
63 changes: 63 additions & 0 deletions packages/validator/src/services/distributedAttestation.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import {toHexString} from "@chainsafe/ssz";
import {computeEpochAtSlot} from "@lodestar/state-transition";
import {phase0, Slot} from "@lodestar/types";
import {sleep} from "@lodestar/utils";
import {AttestationService, AttestationServiceArgs} from "./attestation.js";
import {groupAttDutiesByCommitteeIndex} from "./utils.js";

export class DistributedAttestationService extends AttestationService {
constructor(...args: AttestationServiceArgs) {
super(...args);
}

protected runAttestationTasks = async (slot: Slot, signal: AbortSignal): Promise<void> => {
// Fetch info first so a potential delay is absorbed by the sleep() below
const duties = this.dutiesService.getDutiesAtSlot(slot);
if (duties.length === 0) {
return;
}

// A validator should create and broadcast the attestation to the associated attestation subnet when either
// (a) the validator has received a valid block from the expected block proposer for the assigned slot or
// (b) one-third of the slot has transpired (SECONDS_PER_SLOT / 3 seconds after the start of slot) -- whichever comes first.
await Promise.race([sleep(this.clock.msToSlot(slot + 1 / 3), signal), this.emitter.waitForBlockSlot(slot)]);
this.metrics?.attesterStepCallProduceAttestation.observe(this.clock.secFromSlot(slot + 1 / 3));

const dutiesByCommitteeIndex = groupAttDutiesByCommitteeIndex(duties);

// Step 1. Sign `Attestation` for each validator
const signedAttestations: phase0.Attestation[] = [];

await Promise.all(
Array.from(dutiesByCommitteeIndex.entries()).map(async ([index, duties]) => {
const attestationData = await this.produceAttestation(slot, index);
const headRootHex = toHexString(attestationData.beaconBlockRoot);
const currentEpoch = computeEpochAtSlot(slot);

duties.map(async ({duty}) => {
const signedAttestation = await this.signAttestation(duty, attestationData, currentEpoch, slot, headRootHex);
if (signedAttestation) signedAttestations.push(signedAttestation);
});
})
);

// Step 2. Publish all `Attestations` in one go
await this.publishAttestations(slot, signedAttestations);

// Step 3. after all attestations are submitted, make an aggregate.
// First, wait until the `aggregation_production_instant` (2/3rds of the way though the slot)
await sleep(this.clock.msToSlot(slot + 2 / 3), signal);
this.metrics?.attesterStepCallProduceAggregate.observe(this.clock.secFromSlot(slot + 2 / 3));

// Then download, sign and publish a `SignedAggregateAndProof` for each
// validator that is elected to aggregate for this `slot` and
// `committeeIndex`.
await Promise.all(
Array.from(dutiesByCommitteeIndex.entries()).map(([index, duties]) => {
// const attestation = signedAttestations.find((a) => a.data.index === index);
const attestationData: phase0.AttestationData = {...signedAttestations[0].data, index};
return this.produceAndPublishAggregates(attestationData, duties);
})
);
};
}
13 changes: 9 additions & 4 deletions packages/validator/src/validator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import {computeEpochAtSlot, getCurrentSlot} from "@lodestar/state-transition";
import {Clock, IClock} from "./util/clock.js";
import {waitForGenesis} from "./genesis.js";
import {BlockProposingService} from "./services/block.js";
import {AttestationService} from "./services/attestation.js";
import {AttestationService, AttestationServiceArgs} from "./services/attestation.js";
import {DistributedAttestationService} from "./services/distributedAttestation.js";
import {IndicesService} from "./services/indices.js";
import {SyncCommitteeService} from "./services/syncCommittee.js";
import {pollPrepareBeaconProposer, pollBuilderValidatorRegistration} from "./services/prepareBeaconProposer.js";
Expand All @@ -36,6 +37,7 @@ export type ValidatorOptions = {
doppelgangerProtectionEnabled?: boolean;
closed?: boolean;
valProposerConfig?: ValidatorProposerConfig;
distributed?: boolean;
};

// TODO: Extend the timeout, and let it be customizable
Expand Down Expand Up @@ -115,16 +117,19 @@ export class Validator {

this.blockProposingService = new BlockProposingService(config, loggerVc, api, clock, validatorStore, metrics);

this.attestationService = new AttestationService(
const attestationServiceArgs: AttestationServiceArgs = [
loggerVc,
api,
clock,
validatorStore,
emitter,
chainHeaderTracker,
metrics,
{afterBlockDelaySlotFraction: opts.afterBlockDelaySlotFraction}
);
{afterBlockDelaySlotFraction: opts.afterBlockDelaySlotFraction},
];
this.attestationService = opts.distributed
? new DistributedAttestationService(...attestationServiceArgs)
: new AttestationService(...attestationServiceArgs);

this.syncCommitteeService = new SyncCommitteeService(
config,
Expand Down

0 comments on commit 6deb360

Please sign in to comment.