Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: n historical states #6008

Closed
wants to merge 42 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
8c25f9f
feat: implement migrateState()
twoeths Sep 20, 2023
763ebde
feat: createCachedBeaconState from cached Shufflings
twoeths Sep 20, 2023
e3d1bee
feat: refactor migrateState to loadState
twoeths Sep 21, 2023
6df1b5a
feat: implement loadCachedBeaconState() api
twoeths Sep 23, 2023
e7bcece
feat: enhance checkpoint cache with persist/reload capabilities
twoeths Sep 25, 2023
a39cc5f
feat: separate to get, getOrReload, getStateOrBytes() apis
twoeths Sep 25, 2023
7214e9d
feat: consume checkpoint state cache apis
twoeths Sep 26, 2023
2b9c491
feat: refactor state cache to be LRU
twoeths Sep 26, 2023
253811d
feat: implement and use ShufflingCache
twoeths Sep 26, 2023
2390594
fix: max epochs in memory in checkpoint state cache
twoeths Sep 26, 2023
a48f8f6
feat: add cli options for state caches
twoeths Sep 26, 2023
c5dffed
feat: use relative ssz
twoeths Sep 26, 2023
c4090eb
fix: bind this in checkpointStateCache apis
twoeths Sep 26, 2023
87cca20
fix: do not add non-spec checkpoint state to cache
twoeths Sep 27, 2023
4302ecd
fix: pruneFromMemory at the last 1/3 slot of slot 0
twoeths Sep 27, 2023
6c1c720
fix: also add non-spec checkpoint state to cache
twoeths Sep 27, 2023
7d5e4f6
fix: deleteAllEpochItems should also delete inMemoryKeyOrder
twoeths Sep 27, 2023
de65f2c
fix: support reload in 0-historical state config
twoeths Sep 27, 2023
aaaa88a
fix: /eth/v1/lodestar/state_cache_items api
twoeths Sep 28, 2023
801d521
fix: regen findFirstStateBlock
twoeths Sep 28, 2023
b206639
chore: add verbose log when reloading state
twoeths Sep 28, 2023
ee2e55a
fix: correct regen iterateAncestorBlocks params
twoeths Sep 28, 2023
6fdae10
chore: getStateSync to verify attestations
twoeths Sep 28, 2023
6beaf19
chore: only remove state file if reload succesful
twoeths Oct 1, 2023
fe0883b
feat: loadState without checking same state type
twoeths Oct 1, 2023
843b824
feat: persistentCheckpointStateCache flag
twoeths Oct 2, 2023
166ee37
chore: track in-memory epochs and persistent epochs
twoeths Oct 2, 2023
96dba21
feat: persist 1 state per epoch
twoeths Oct 3, 2023
f116b93
fix: previousShuffling in loadState
twoeths Oct 3, 2023
f624126
fix: do not skip pruneCheckpointStateCache per epoch
twoeths Oct 4, 2023
0b15ae2
fix: prune per slot
twoeths Oct 5, 2023
ea56579
chore: make CPStatePersistentApis generic
twoeths Oct 5, 2023
dad4483
feat: implement db persistent option
twoeths Oct 6, 2023
64e8a4c
feat: verify attestations using ShufflingCache
twoeths Oct 8, 2023
63b156c
fix: add caller to shuffling metrics
twoeths Oct 9, 2023
99d6af5
chore: persist checkpoint states to db by default
twoeths Oct 9, 2023
4669871
feat: nHistoricalStates flag
twoeths Oct 9, 2023
d4443ab
chore: add metric to ShufflingCache
twoeths Oct 9, 2023
0da184d
fix: populate ShufflingCache in chain constructor
twoeths Oct 9, 2023
76d6f99
chore: ssz v0.14.0
twoeths Oct 30, 2023
22cf38c
fix: avoid cleaning old checkpoint states in PersistentApi constructor
twoeths Oct 30, 2023
6e16d94
fix: avoid batchDelete in DbPersistentApis
twoeths Oct 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,9 @@ export async function importBlock(
// it's important to add this to cache, when chain is finalized we'll query this state later
const checkpointState = postState;
const cp = getCheckpointFromState(checkpointState);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put this inside if (block.message.slot % SLOTS_PER_EPOCH === 0) { condition below, otherwise will get "Block error slot=452161 error=Checkpoint state slot must be first in an epoch" error as monitored on test branch

this.regen.addCheckpointState(cp, checkpointState);
if (block.message.slot % SLOTS_PER_EPOCH === 0) {
twoeths marked this conversation as resolved.
Show resolved Hide resolved
this.regen.addCheckpointState(cp, checkpointState);
}

// Note: in-lined code from previos handler of ChainEvent.checkpoint
this.logger.verbose("Checkpoint processed", toCheckpointHex(cp));
Expand Down
5 changes: 4 additions & 1 deletion packages/beacon-node/src/chain/prepareNextSlot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,11 @@ export class PrepareNextSlotScheduler {
}
}

// Pruning at the last 1/3 slot of epoch is the safest time because all epoch transitions already use the checkpoint states cached
// one down side of this is when `inMemoryEpochs = 0` and gossip block hasn't come yet then we have to reload state we added 2/3 slot ago
// however, it's not likely `inMemoryEpochs` is configured as 0, and this scenario is rarely happen
// since we only use `inMemoryEpochs = 0` for testing, if it happens it's a good thing because it helps us test the reload flow
if (clockSlot % SLOTS_PER_EPOCH === 0) {
// Don't let the checkpoint state cache to prune on its own, prune at the last 1/3 slot of slot 0 of each epoch
const pruneCount = this.chain.regen.pruneCheckpointStateCache();
this.logger.verbose("Pruned checkpoint state cache", {clockSlot, nextEpoch, pruneCount});
}
Expand Down
6 changes: 4 additions & 2 deletions packages/beacon-node/src/chain/regen/regen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -291,10 +291,12 @@ async function processSlotsToNearestCheckpoint(
// processSlots calls .clone() before mutating
postState = processSlots(postState, nextEpochSlot, opts, metrics);

// Non-spec checkpoint state because the root is of previous epoch
// this is usually added when we validate gossip block at the start of an epoch
// then when we process block, we don't have to do state transition again
// TODO: figure out if it's worth to persist this state to disk
// note that this state could be real checkpoint state or just a state after processing empty slots
// - if the 1st block of the epoch is skipped, it's a checkpoint state
// - if the 1st block of the epoch is processed, it's NOT a checkpoint state
// however we still need to add this state to cache to preserve epoch transitions
const checkpointState = postState;
const cp = getCheckpointFromState(checkpointState);
checkpointStateCache.add(cp, checkpointState);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
checkpointStateCache.add(cp, checkpointState);
if (shouldReload) {
checkpointStateCache.add(cp, checkpointState);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import path from "node:path";
import {toHexString} from "@chainsafe/ssz";
import {phase0, Epoch, RootHex} from "@lodestar/types";
import {CachedBeaconStateAllForks, computeStartSlotAtEpoch} from "@lodestar/state-transition";
import {CachedBeaconStateAllForks, computeStartSlotAtEpoch, getBlockRootAtSlot} from "@lodestar/state-transition";
import {Logger, MapDef, ensureDir} from "@lodestar/utils";
import {routes} from "@lodestar/api";
import {loadCachedBeaconState} from "@lodestar/state-transition";
import {Metrics} from "../../metrics/index.js";
import {LinkedList} from "../../util/array.js";
import {IClock} from "../../util/clock.js";
import {ShufflingCache} from "../shufflingCache.js";
import {MapTracker} from "./mapMetrics.js";
Expand All @@ -23,7 +22,6 @@ import {
StateFile,
CheckpointStateCache,
} from "./types.js";
import {from} from "multiformats/dist/types/src/bases/base.js";

/**
* Cache of CachedBeaconState belonging to checkpoint
Expand All @@ -35,8 +33,8 @@ import {from} from "multiformats/dist/types/src/bases/base.js";
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent module-wide description of the mechanism!! Super helpful an appreciated, I would encourage to do this level of documentation on more functions and new pieces that are not naive.

export class PersistentCheckpointStateCache implements CheckpointStateCache {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

write unit tests for the weird case where we do multiple epoch transitions, no blocks in the middle

private readonly cache: MapTracker<string, CachedBeaconStateAllForks | StateFile>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While typing the Map value as CachedBeaconStateAllForks | string, you could be more explicit defining a wrapper object. The amount of items in this map is low (< 1000) so the wrapper does not add any performance penalty but helps readibility and manteinance.

Something like:

private readonly cache: MapTracker<string, StateCacheItem>;

type StateCacheItem =
  | {type: StateCacheItemType.filePath, filepath: string}
  | {type: StateCacheItemType.cachedBeaconState, value: CachedBeaconStateAllForks}

Then in the consumer code you can be explicit about what to do in each case

// key order of in memory items to implement LRU cache
private readonly inMemoryKeyOrder: LinkedList<string>;
// maintain order of epoch to decide which epoch to prune from memory
private readonly inMemoryEpochs: Set<Epoch>;
/** Epoch -> Set<blockRoot> */
private readonly epochIndex = new MapDef<Epoch, Set<string>>(() => new Set<string>());
private readonly metrics: Metrics["cpStateCache"] | null | undefined;
Expand Down Expand Up @@ -79,12 +77,15 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
}
this.logger = logger;
this.clock = clock;
if (opts.maxEpochsInMemory < 0) {
throw new Error("maxEpochsInMemory must be >= 0");
}
this.maxEpochsInMemory = opts.maxEpochsInMemory;
// Specify different persistentApis for testing
this.persistentApis = persistentApis ?? FILE_APIS;
this.shufflingCache = shufflingCache;
this.getHeadState = getHeadState;
this.inMemoryKeyOrder = new LinkedList<string>();
this.inMemoryEpochs = new Set();
void ensureDir(CHECKPOINT_STATES_FOLDER);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function can throw an error, don't void

}

Expand Down Expand Up @@ -139,8 +140,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
// only remove file once we reload successfully
void this.persistentApis.removeFile(filePath);
this.cache.set(cpKey, newCachedState);
// since item is file path, cpKey is not in inMemoryKeyOrder
this.inMemoryKeyOrder.unshift(cpKey);
this.inMemoryEpochs.add(cp.epoch);
// don't prune from memory here, call it at the last 1/3 of slot 0 of an epoch
return newCachedState;
} catch (e) {
Expand Down Expand Up @@ -194,7 +194,6 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {

if (typeof stateOrFilePath !== "string") {
this.metrics?.stateClonedCount.observe(stateOrFilePath.clonedCount);
this.inMemoryKeyOrder.moveToHead(cpKey);
return stateOrFilePath;
}

Expand All @@ -208,23 +207,18 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
const cpHex = toCheckpointHex(cp);
const key = toCheckpointKey(cpHex);
const stateOrFilePath = this.cache.get(key);
this.inMemoryEpochs.add(cp.epoch);
if (stateOrFilePath !== undefined) {
if (typeof stateOrFilePath === "string") {
// was persisted to disk, set back to memory
this.cache.set(key, state);
void this.persistentApis.removeFile(stateOrFilePath);
this.metrics?.stateFilesRemoveCount.inc({reason: RemoveFileReason.stateUpdate});
this.inMemoryKeyOrder.unshift(key);
} else {
// already in memory
// move to head of inMemoryKeyOrder
this.inMemoryKeyOrder.moveToHead(key);
}
return;
}
this.metrics?.adds.inc();
this.cache.set(key, state);
this.inMemoryKeyOrder.unshift(key);
this.epochIndex.getOrDefault(cp.epoch).add(cpHex.rootHex);
// don't prune from memory here, call it at the last 1/3 of slot 0 of an epoch
}
Expand Down Expand Up @@ -300,7 +294,20 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
delete(cp: phase0.Checkpoint): void {
const key = toCheckpointKey(toCheckpointHex(cp));
this.cache.delete(key);
this.inMemoryKeyOrder.deleteFirst(key);
// check if there's any state left in memory for this epoch
let foundState = false;
for (const rootHex of this.epochIndex.get(cp.epoch)?.values() || []) {
const cpKey = toCheckpointKey({epoch: cp.epoch, rootHex});
const stateOrFilePath = this.cache.get(cpKey);
if (stateOrFilePath !== undefined && typeof stateOrFilePath !== "string") {
// this is a state
foundState = true;
break;
}
}
if (!foundState) {
this.inMemoryEpochs.delete(cp.epoch);
}
const epochKey = toHexString(cp.root);
const value = this.epochIndex.get(cp.epoch);
if (value) {
Expand All @@ -325,52 +332,85 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
// this could be improved by looping through inMemoryKeyOrder once
// however with this.maxEpochsInMemory = 2, the list is 6 maximum so it's not a big deal now
this.cache.delete(key);
this.inMemoryKeyOrder.deleteFirst(key);
}
this.inMemoryEpochs.delete(epoch);
this.epochIndex.delete(epoch);
}

/**
* This is slow code because it involves serializing the whole state to disk which takes 600ms as of Sep 2023
* This is slow code because it involves serializing the whole state to disk which takes 600ms to 900ms as of Sep 2023
* The add() is called after we process 1st block of an epoch, we don't want to pruneFromMemory at that time since it's the hot time
* Call this code at the last 1/3 slot of slot 0 of an epoch
*/
pruneFromMemory(): number {
let count = 0;
while (this.inMemoryKeyOrder.length > 0 && this.countEpochsInMemory() > this.maxEpochsInMemory) {
const key = this.inMemoryKeyOrder.last();
if (!key) {
while (this.inMemoryEpochs.size > this.maxEpochsInMemory) {
let firstEpoch: Epoch | undefined;
for (const epoch of this.inMemoryEpochs) {
firstEpoch = epoch;
break;
}
if (firstEpoch === undefined) {
// should not happen
throw new Error(`No key ${key} found in inMemoryKeyOrder}`);
throw new Error("No epoch in memory");
}
const stateOrFilePath = this.cache.get(key);
// even if stateOrFilePath is undefined or string, we still need to pop the key
this.inMemoryKeyOrder.pop();
if (stateOrFilePath !== undefined && typeof stateOrFilePath !== "string") {
// do not update epochIndex
const filePath = toTmpFilePath(key);
this.metrics?.statePersistSecFromSlot.observe(this.clock?.secFromSlot(this.clock?.currentSlot ?? 0) ?? 0);
const timer = this.metrics?.statePersistDuration.startTimer();
void this.persistentApis.writeIfNotExist(filePath, stateOrFilePath.serialize());
timer?.();
this.cache.set(key, filePath);
count++;
this.logger.verbose("Persist state to disk", {filePath, stateSlot: stateOrFilePath.slot});
} else {
// should not happen, log anyway
this.logger.debug(`Unexpected stateOrFilePath ${stateOrFilePath} for key ${key}`);
// first loop to check if the 1st slot of epoch is a skipped slot or not
let firstSlotBlockRoot: string | undefined;
for (const rootHex of this.epochIndex.get(firstEpoch) ?? []) {
const cpKey = toCheckpointKey({epoch: firstEpoch, rootHex});
const stateOrFilePath = this.cache.get(cpKey);
if (stateOrFilePath !== undefined && typeof stateOrFilePath !== "string") {
// this is a state
if (rootHex !== toHexString(getBlockRootAtSlot(stateOrFilePath, computeStartSlotAtEpoch(firstEpoch) - 1))) {
firstSlotBlockRoot = rootHex;
break;
}
}
}
}

return count;
}
// if found firstSlotBlockRoot it means it's a checkpoint state and we should only persist that checkpoint, delete the other
// if not found firstSlotBlockRoot, first slot of state is skipped, we should persist the other checkpoint state, with the root is the last slot of pervious epoch
for (const rootHex of this.epochIndex.get(firstEpoch) ?? []) {
let toPersist = false;
let toDelete = false;
if (firstSlotBlockRoot === undefined) {
toPersist = true;
} else {
if (rootHex === firstSlotBlockRoot) {
toPersist = true;
} else {
toDelete = true;
}
}
const cpKey = toCheckpointKey({epoch: firstEpoch, rootHex});
const stateOrFilePath = this.cache.get(cpKey);
if (stateOrFilePath !== undefined && typeof stateOrFilePath !== "string") {
if (toPersist) {
// do not update epochIndex
const filePath = toTmpFilePath(cpKey);
this.metrics?.statePersistSecFromSlot.observe(this.clock?.secFromSlot(this.clock?.currentSlot ?? 0) ?? 0);
const timer = this.metrics?.statePersistDuration.startTimer();
void this.persistentApis.writeIfNotExist(filePath, stateOrFilePath.serialize());
timer?.();
this.cache.set(cpKey, filePath);
count++;
this.logger.verbose("Prune checkpoint state from memory and persist to disk", {
filePath,
stateSlot: stateOrFilePath.slot,
rootHex,
});
} else if (toDelete) {
this.cache.delete(cpKey);
this.metrics?.statePruneFromMemoryCount.inc();
this.logger.verbose("Prune checkpoint state from memory", {stateSlot: stateOrFilePath.slot, rootHex});
}
}
}

private countEpochsInMemory(): number {
const epochs = new Set<Epoch>();
for (const key of this.inMemoryKeyOrder) {
epochs.add(fromCheckpointKey(key).epoch);
this.inMemoryEpochs.delete(firstEpoch);
}
return epochs.size;

return count;
}

clear(): void {
Expand Down
4 changes: 4 additions & 0 deletions packages/beacon-node/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1048,6 +1048,10 @@ export function createLodestarMetrics(
help: "Histogram of time to persist state to memory",
buckets: [0.5, 1, 2, 4],
}),
statePruneFromMemoryCount: register.gauge({
name: "lodestar_cp_state_cache_state_prune_from_memory_count",
help: "Total number of states pruned from memory",
}),
statePersistSecFromSlot: register.histogram({
name: "lodestar_cp_state_cache_state_persist_seconds_from_slot",
help: "Histogram of time to persist state to memory from slot",
Expand Down
Loading