Skip to content

Commit

Permalink
feat: limit by number of epochs for forky condition
Browse files Browse the repository at this point in the history
  • Loading branch information
twoeths committed Oct 13, 2023
1 parent abb833d commit 2d0d4ff
Show file tree
Hide file tree
Showing 15 changed files with 233 additions and 235 deletions.
7 changes: 4 additions & 3 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
isCachedBeaconState,
Index2PubkeyCache,
PubkeyIndexMap,
EpochShuffling,
} from "@lodestar/state-transition";
import {BeaconConfig} from "@lodestar/config";
import {
Expand Down Expand Up @@ -214,7 +215,7 @@ export class BeaconChain implements IBeaconChain {

this.beaconProposerCache = new BeaconProposerCache(opts);
this.checkpointBalancesCache = new CheckpointBalancesCache();
this.shufflingCache = new ShufflingCache(metrics);
this.shufflingCache = new ShufflingCache(metrics, this.opts);

// Restore state caches
// anchorState may already by a CachedBeaconState. If so, don't create the cache again, since deserializing all
Expand Down Expand Up @@ -644,7 +645,7 @@ export class BeaconChain implements IBeaconChain {
shufflingDependentRoot: RootHex,
attHeadBlock: ProtoBlock,
regenCaller: RegenCaller
): Promise<void> {
): Promise<EpochShuffling> {
// this is to prevent multiple calls to get shuffling for the same epoch and dependent root
// any subsequent calls of the same epoch and dependent root will wait for this promise to resolve
this.shufflingCache.insertPromise(attEpoch, shufflingDependentRoot);
Expand Down Expand Up @@ -673,7 +674,7 @@ export class BeaconChain implements IBeaconChain {
}

// resolve the promise to unblock other calls of the same epoch and dependent root
this.shufflingCache.processState(state, attEpoch);
return this.shufflingCache.processState(state, attEpoch);
}

/**
Expand Down
8 changes: 1 addition & 7 deletions packages/beacon-node/src/chain/errors/attestationError.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {toHexString} from "@chainsafe/ssz";
import {CommitteeIndex, Epoch, Slot, ValidatorIndex, RootHex} from "@lodestar/types";
import {Epoch, Slot, ValidatorIndex, RootHex} from "@lodestar/types";
import {GossipActionError} from "./gossipValidation.js";

export enum AttestationErrorCode {
Expand Down Expand Up @@ -65,11 +65,6 @@ export enum AttestationErrorCode {
* A signature on the attestation is invalid.
*/
INVALID_SIGNATURE = "ATTESTATION_ERROR_INVALID_SIGNATURE",
/**
* There is no committee for the slot and committee index of this attestation
* and the attestation should not have been produced.
*/
NO_COMMITTEE_FOR_SLOT_AND_INDEX = "ATTESTATION_ERROR_NO_COMMITTEE_FOR_SLOT_AND_INDEX",
/**
* The unaggregated attestation doesn't have only one aggregation bit set.
*/
Expand Down Expand Up @@ -150,7 +145,6 @@ export type AttestationErrorType =
| {code: AttestationErrorCode.HEAD_NOT_TARGET_DESCENDANT}
| {code: AttestationErrorCode.UNKNOWN_TARGET_ROOT; root: Uint8Array}
| {code: AttestationErrorCode.INVALID_SIGNATURE}
| {code: AttestationErrorCode.NO_COMMITTEE_FOR_SLOT_AND_INDEX; slot: Slot; index: CommitteeIndex}
| {code: AttestationErrorCode.NOT_EXACTLY_ONE_AGGREGATION_BIT_SET}
| {code: AttestationErrorCode.PRIOR_ATTESTATION_KNOWN; validatorIndex: ValidatorIndex; epoch: Epoch}
| {code: AttestationErrorCode.FUTURE_EPOCH; attestationEpoch: Epoch; currentEpoch: Epoch}
Expand Down
3 changes: 2 additions & 1 deletion packages/beacon-node/src/chain/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {allForks, UintNum64, Root, phase0, Slot, RootHex, Epoch, ValidatorIndex,
import {
BeaconStateAllForks,
CachedBeaconStateAllForks,
EpochShuffling,
Index2PubkeyCache,
PubkeyIndexMap,
} from "@lodestar/state-transition";
Expand Down Expand Up @@ -165,7 +166,7 @@ export interface IBeaconChain {
shufflingDependentRoot: RootHex,
attHeadBlock: ProtoBlock,
regenCaller: RegenCaller
): Promise<void>;
): Promise<EpochShuffling>;
updateBuilderStatus(clockSlot: Slot): void;

regenCanAcceptWork(): boolean;
Expand Down
2 changes: 2 additions & 0 deletions packages/beacon-node/src/chain/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ import {defaultOptions as defaultValidatorOptions} from "@lodestar/validator";
import {ArchiverOpts} from "./archiver/index.js";
import {ForkChoiceOpts} from "./forkChoice/index.js";
import {LightClientServerOpts} from "./lightClient/index.js";
import {ShufflingCacheOpts} from "./shufflingCache.js";

export type IChainOptions = BlockProcessOpts &
PoolOpts &
SeenCacheOpts &
ForkChoiceOpts &
ArchiverOpts &
ShufflingCacheOpts &
LightClientServerOpts & {
blsVerifyAllMainThread?: boolean;
blsVerifyAllMultiThread?: boolean;
Expand Down
120 changes: 65 additions & 55 deletions packages/beacon-node/src/chain/shufflingCache.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {CachedBeaconStateAllForks, EpochShuffling, getShufflingDecisionBlock} from "@lodestar/state-transition";
import {Epoch, RootHex} from "@lodestar/types";
import {MapDef, pruneSetToMax} from "@lodestar/utils";
import {Metrics} from "../metrics/metrics.js";

/**
Expand All @@ -8,13 +9,13 @@ import {Metrics} from "../metrics/metrics.js";
* - don't have shuffling to verify attestations, need to do 1 epoch transition to add shuffling to this cache. This never happens
* with default chain option of maxSkipSlots = 32
**/
const MAX_SHUFFLING_CACHE_SIZE = 4;
const MAX_EPOCHS = 4;

/**
* With default chain option of maxSkipSlots = 32, there should be no shuffling promise. If that happens a lot, it could blow up Lodestar,
* with MAX_SHUFFLING_CACHE_SIZE = 4, only allow 1 promise at a time.
* with MAX_EPOCHS = 4, only allow 2 promise at a time. Note that regen already bounds number of concurrent requests at 1 already.
*/
const MAX_SHUFFLING_PROMISE = 1;
const MAX_PROMISES = 2;

enum CacheItemType {
shuffling,
Expand All @@ -23,21 +24,21 @@ enum CacheItemType {

type ShufflingCacheItem = {
type: CacheItemType.shuffling;
decisionBlockHex: RootHex;
epoch: Epoch;
shuffling: EpochShuffling;
};

type PromiseCacheItem = {
type: CacheItemType.promise;
decisionBlockHex: RootHex;
epoch: Epoch;
promise: Promise<EpochShuffling>;
resolveFn: (shuffling: EpochShuffling) => void;
};

type CacheItem = ShufflingCacheItem | PromiseCacheItem;

export type ShufflingCacheOpts = {
maxShufflingCacheEpochs?: number;
};

/**
* A shuffling cache to help:
* - get committee quickly for attestation verification
Expand All @@ -46,21 +47,31 @@ type CacheItem = ShufflingCacheItem | PromiseCacheItem;
*/
export class ShufflingCache {
/** LRU cache implemented as an array, pruned every time we add an item */
private readonly items: CacheItem[] = [];
private readonly itemsByDecisionRootByEpoch: MapDef<Epoch, Map<RootHex, CacheItem>> = new MapDef(
() => new Map<RootHex, CacheItem>()
);

private readonly maxEpochs: number;

constructor(
private readonly metrics: Metrics | null = null,
private maxSize = MAX_SHUFFLING_CACHE_SIZE
opts: ShufflingCacheOpts = {}
) {
if (metrics) {
metrics.shufflingCache.size.addCollect(() => metrics.shufflingCache.size.set(this.items.length));
metrics.shufflingCache.size.addCollect(() =>
metrics.shufflingCache.size.set(
Array.from(this.itemsByDecisionRootByEpoch.values()).reduce((total, innerMap) => total + innerMap.size, 0)
)
);
}

this.maxEpochs = opts.maxShufflingCacheEpochs ?? MAX_EPOCHS;
}

/**
* Extract shuffling from state and add to cache
*/
processState(state: CachedBeaconStateAllForks, shufflingEpoch: Epoch): void {
processState(state: CachedBeaconStateAllForks, shufflingEpoch: Epoch): EpochShuffling {
const decisionBlockHex = getShufflingDecisionBlock(state, shufflingEpoch);
let shuffling: EpochShuffling;
switch (shufflingEpoch) {
Expand All @@ -77,41 +88,45 @@ export class ShufflingCache {
throw new Error(`Shuffling not found from state ${state.slot} for epoch ${shufflingEpoch}`);
}

let found = false;
for (const item of this.items) {
if (item.epoch === shufflingEpoch && item.decisionBlockHex === decisionBlockHex) {
found = true;
if (isPromiseCacheItem(item)) {
// unblock consumers of this promise
item.resolveFn(shuffling);
// then update item type to shuffling
Object.assign(item, {type: CacheItemType.shuffling, shuffling});
// TODO: remove promise and resolveFn?
// we updated type to CacheItemType.shuffling so the above fields are not used anyway
this.metrics?.shufflingCache.processStateUpdatePromise.inc();
} else {
// ShufflingCacheItem, do nothing
this.metrics?.shufflingCache.processStateNoOp.inc();
}
break;
let cacheItem = this.itemsByDecisionRootByEpoch.getOrDefault(shufflingEpoch).get(decisionBlockHex);
if (cacheItem !== undefined) {
// update existing promise
if (isPromiseCacheItem(cacheItem)) {
// unblock consumers of this promise
cacheItem.resolveFn(shuffling);
// then update item type to shuffling
cacheItem = {
type: CacheItemType.shuffling,
shuffling,
};
this.add(shufflingEpoch, decisionBlockHex, cacheItem);
// we updated type to CacheItemType.shuffling so the above fields are not used anyway
this.metrics?.shufflingCache.processStateUpdatePromise.inc();
} else {
// ShufflingCacheItem, do nothing
this.metrics?.shufflingCache.processStateNoOp.inc();
}
}

if (!found) {
this.add({type: CacheItemType.shuffling, epoch: shufflingEpoch, decisionBlockHex, shuffling});
} else {
// not found, new shuffling
this.add(shufflingEpoch, decisionBlockHex, {type: CacheItemType.shuffling, shuffling});
this.metrics?.shufflingCache.processStateInsertNew.inc();
}

return shuffling;
}

/**
* Insert a promise to make sure we don't regen state for the same shuffling.
* Bound by MAX_SHUFFLING_PROMISE to make sure our node does not blow up.
*/
insertPromise(shufflingEpoch: Epoch, dependentRootHex: RootHex): void {
const promiseCount = this.items.filter((item) => isPromiseCacheItem(item)).length;
if (promiseCount >= MAX_SHUFFLING_PROMISE) {
insertPromise(shufflingEpoch: Epoch, decisionRootHex: RootHex): void {
const promiseCount = Array.from(this.itemsByDecisionRootByEpoch.values())
.map((innerMap) => Array.from(innerMap.values()))
.flat()
.filter((item) => isPromiseCacheItem(item)).length;
if (promiseCount >= MAX_PROMISES) {
throw new Error(
`Too many shuffling promises: ${promiseCount}, shufflingEpoch: ${shufflingEpoch}, dependentRootHex: ${dependentRootHex}`
`Too many shuffling promises: ${promiseCount}, shufflingEpoch: ${shufflingEpoch}, decisionRootHex: ${decisionRootHex}`
);
}
let resolveFn: ((shuffling: EpochShuffling) => void) | null = null;
Expand All @@ -124,12 +139,10 @@ export class ShufflingCache {

const cacheItem: PromiseCacheItem = {
type: CacheItemType.promise,
epoch: shufflingEpoch,
decisionBlockHex: dependentRootHex,
promise,
resolveFn,
};
this.add(cacheItem);
this.add(shufflingEpoch, decisionRootHex, cacheItem);
this.metrics?.shufflingCache.insertPromiseCount.inc();
}

Expand All @@ -138,26 +151,23 @@ export class ShufflingCache {
* If there's a promise, it means we are computing the same shuffling, so we wait for the promise to resolve.
* Return null if we don't have a shuffling for this epoch and dependentRootHex.
*/
async get(shufflingEpoch: Epoch, dependentRootHex: RootHex): Promise<EpochShuffling | null> {
for (const item of this.items) {
if (item.epoch === shufflingEpoch && item.decisionBlockHex === dependentRootHex) {
if (isShufflingCacheItem(item)) {
return item.shuffling;
} else {
return item.promise;
}
}
async get(shufflingEpoch: Epoch, decisionRootHex: RootHex): Promise<EpochShuffling | null> {
const cacheItem = this.itemsByDecisionRootByEpoch.getOrDefault(shufflingEpoch).get(decisionRootHex);
if (cacheItem === undefined) {
return null;
}

// not found
return null;
if (isShufflingCacheItem(cacheItem)) {
return cacheItem.shuffling;
} else {
// promise
return cacheItem.promise;
}
}

private add(cacheItem: CacheItem): void {
if (this.items.length === this.maxSize) {
this.items.shift();
}
this.items.push(cacheItem);
private add(shufflingEpoch: Epoch, decisionBlock: RootHex, cacheItem: CacheItem): void {
this.itemsByDecisionRootByEpoch.getOrDefault(shufflingEpoch).set(decisionBlock, cacheItem);
pruneSetToMax(this.itemsByDecisionRootByEpoch, this.maxEpochs);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,6 @@ async function validateAggregateAndProof(
RegenCaller.validateGossipAttestation
);

if (shuffling === null) {
throw new AttestationError(GossipAction.IGNORE, {
code: AttestationErrorCode.NO_COMMITTEE_FOR_SLOT_AND_INDEX,
index: attIndex,
slot: attSlot,
});
}

const committeeIndices: number[] = cachedAttData
? cachedAttData.committeeIndices
: getCommitteeIndices(shuffling, attSlot, attIndex);
Expand Down
Loading

0 comments on commit 2d0d4ff

Please sign in to comment.