Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle block forks #584

Merged
merged 6 commits into from
Dec 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import {assembleAttestation} from "../../../chain/factory/attestation";
import {Attestation, BeaconState, BLSPubkey, CommitteeIndex, Slot} from "@chainsafe/eth2.0-types";
import {Attestation, BLSPubkey, CommitteeIndex, Slot} from "@chainsafe/eth2.0-types";
import {IBeaconDb} from "../../../db/api";
import {IBeaconChain} from "../../../chain";
import {IBeaconConfig} from "@chainsafe/eth2.0-config";
import {clone} from "@chainsafe/ssz";
import {processSlots} from "@chainsafe/eth2.0-state-transition";

export async function produceAttestation(
{config, db, chain}: {config: IBeaconConfig; db: IBeaconDb; chain: IBeaconChain},
Expand All @@ -12,11 +12,12 @@ export async function produceAttestation(
slot: Slot
): Promise<Attestation|null> {
try {
const [headState, headBlock, validatorIndex] = await Promise.all([
clone(chain.latestState, config.types.BeaconState) as BeaconState,
const [headBlock, validatorIndex] = await Promise.all([
db.block.get(chain.forkChoice.head()),
db.getValidatorIndex(validatorPubKey)
]);
const headState = await db.state.get(headBlock.stateRoot);
await processSlots(config, headState, slot);
twoeths marked this conversation as resolved.
Show resolved Hide resolved
return await assembleAttestation({config, db}, headState, headBlock, validatorIndex, index, slot);
} catch (e) {
throw e;
Expand Down
60 changes: 28 additions & 32 deletions packages/lodestar/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {getEmptyBlock, initializeBeaconStateFromEth1, isValidGenesisState} from

import {processSlots, stateTransition,
computeEpochAtSlot,
computeStartSlotAtEpoch,
getAttestingIndices,
isActiveValidator
,getCurrentSlot} from "@chainsafe/eth2.0-state-transition";
Expand All @@ -30,7 +31,6 @@ import {IChainOptions} from "./options";
import {OpPool} from "../opPool";
import {Block} from "ethers/providers";
import fs from "fs";
import {sleep} from "../util/sleep";
import {AsyncQueue, queue} from "async";
import FastPriorityQueue from "fastpriorityqueue";

Expand Down Expand Up @@ -118,18 +118,19 @@ export class BeaconChain extends (EventEmitter as { new(): ChainEventEmitter })
public async receiveAttestation(attestation: Attestation): Promise<void> {
const attestationHash = hashTreeRoot(attestation, this.config.types.Attestation);
this.logger.info(`Received attestation ${attestationHash.toString("hex")}`);
const latestState = this.latestState;
try {
const attestationSlot: Slot = attestation.data.slot;
if(attestationSlot + this.config.params.SLOTS_PER_EPOCH < latestState.slot) {
const headBlock = await this.db.block.get(this.forkChoice.head());
const state = await this.db.state.get(headBlock.stateRoot);
if(attestationSlot + this.config.params.SLOTS_PER_EPOCH < state.slot) {
this.logger.verbose(`Attestation ${attestationHash.toString("hex")} is too old. Ignored.`);
return;
}
} catch (e) {
return;
}
this.attestationProcessingQueue.push(async () => {
return this.processAttestation(latestState, attestation, attestationHash);
return this.processAttestation(attestation, attestationHash);
});
}

Expand All @@ -142,35 +143,28 @@ export class BeaconChain extends (EventEmitter as { new(): ChainEventEmitter })

if(!await this.db.block.has(block.parentRoot)) {
this.emit("unknownBlockRoot", block.parentRoot);
this.blockProcessingQueue.add(block);
return;
}

if(block.slot <= this.latestState.slot) {
this.logger.warn(
`Block ${blockHash.toString("hex")} is in past. ` +
"Probably fork choice/double propose/processed block. Ignored for now."
);
if(await this.db.block.has(blockHash)) {
this.logger.warn(`Block ${blockHash} existed already, no need to process it.`)
return;
}

if(block.slot > this.latestState.slot) {
//either block came too early or we are suppose to skip some slots
const latestBlock = await this.db.block.getChainHead();
if(!block.parentRoot.equals(signingRoot(latestBlock, this.config.types.BeaconBlock))){
//block processed too early
this.logger.warn(`Block ${blockHash.toString("hex")} tried to be processed too early. Requeue...`);
//wait a bit to give new block a chance
await sleep(500);
// add to priority queue
this.blockProcessingQueue.add(block);
return;
}
const finalizedCheckpoint = this.forkChoice.getFinalized();
if(block.slot < computeStartSlotAtEpoch(finalizedCheckpoint.epoch + 1)) {
this.logger.warn(
`Block ${blockHash.toString("hex")} is not after ` +
`finalized checkpoint ${finalizedCheckpoint.root.toString("hex")}.`
);
return;
}

await this.processBlock(block, blockHash);
const nextBlockInQueue = this.blockProcessingQueue.peek();
while (nextBlockInQueue) {
const latestBlock = await this.db.block.getChainHead();
if (nextBlockInQueue.parentRoot.equals(signingRoot(latestBlock, this.config.types.BeaconBlock))) {
if (await this.db.block.has(nextBlockInQueue.parentRoot)) {
await this.processBlock(nextBlockInQueue, signingRoot(nextBlockInQueue, this.config.types.BeaconBlock));
this.blockProcessingQueue.poll();
} else{
Expand Down Expand Up @@ -243,16 +237,19 @@ export class BeaconChain extends (EventEmitter as { new(): ChainEventEmitter })
return Math.floor(Date.now() / 1000) >= stateSlotTime;
}

private processAttestation = async (latestState: BeaconState, attestation: Attestation, attestationHash: Hash) => {
const currentSlot = getCurrentSlot(this.config, latestState.genesisTime);
private processAttestation = async (attestation: Attestation, attestationHash: Hash) => {
const justifiedCheckpoint = this.forkChoice.getJustified();
const justifiedBlock = await this.db.block.get(justifiedCheckpoint.root);
const checkpointState = await this.db.state.get(justifiedBlock.stateRoot);
const currentSlot = getCurrentSlot(this.config, checkpointState.genesisTime);
const currentEpoch = computeEpochAtSlot(this.config, currentSlot);
const previousEpoch = currentEpoch > GENESIS_EPOCH ? currentEpoch - 1 : GENESIS_EPOCH;
const epoch = attestation.data.target.epoch;
assert([currentEpoch, previousEpoch].includes(epoch));

const validators = getAttestingIndices(
this.config, latestState, attestation.data, attestation.aggregationBits);
const balances = validators.map((index) => latestState.balances[index]);
this.config, checkpointState, attestation.data, attestation.aggregationBits);
const balances = validators.map((index) => checkpointState.balances[index]);
for (let i = 0; i < validators.length; i++) {
this.forkChoice.addAttestation(attestation.data.beaconBlockRoot, validators[i], balances[i]);
}
Expand All @@ -261,12 +258,12 @@ export class BeaconChain extends (EventEmitter as { new(): ChainEventEmitter })
};

private processBlock = async (block: BeaconBlock, blockHash: Hash) => {

const isValidBlock = await this.isValidBlock(this.latestState, block);
const parentBlock = await this.db.block.get(block.parentRoot);
const pre = await this.db.state.get(parentBlock.stateRoot);
const isValidBlock = await this.isValidBlock(pre, block);
assert(isValidBlock);
this.logger.info(`0x${blockHash.toString("hex")} is valid, running state transition...`);

const pre = this.latestState;
// process current slot
const post = await this.runStateTransition(block, pre);

Expand Down Expand Up @@ -335,10 +332,9 @@ export class BeaconChain extends (EventEmitter as { new(): ChainEventEmitter })
this.db.block.set(blockRoot, block),
this.db.state.set(block.stateRoot, newState),
]);
await this.db.setChainHeadRoots(blockRoot, block.stateRoot);
this.forkChoice.addBlock(block.slot, blockRoot, block.parentRoot, newState.currentJustifiedCheckpoint,
newState.finalizedCheckpoint);
// await this.applyForkChoiceRule();
await this.applyForkChoiceRule();
await this.updateDepositMerkleTree(newState);
twoeths marked this conversation as resolved.
Show resolved Hide resolved
// update metrics
this.metrics.currentSlot.set(block.slot);
Expand Down
4 changes: 0 additions & 4 deletions packages/lodestar/src/chain/factory/attestation/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@ export async function assembleAttestation(
validatorIndex: ValidatorIndex,
index: CommitteeIndex,
slot: Slot): Promise<Attestation> {
while(state.slot < slot) {
state.slot++;
}

const committee = getBeaconCommittee(config, state, computeEpochAtSlot(config, slot), index);
const aggregationBits = getAggregationBits(committee, validatorIndex);
try {
Expand Down
2 changes: 2 additions & 0 deletions packages/lodestar/src/chain/forkChoice/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@ export interface ILMDGHOST {
finalizedCheckpoint: Checkpoint): void;
addAttestation(blockRootBuf: Hash, attester: ValidatorIndex, weight: Gwei): void;
head(): Hash;
getJustified(): Checkpoint;
getFinalized(): Checkpoint;
}
14 changes: 14 additions & 0 deletions packages/lodestar/src/chain/forkChoice/statefulDag/lmdGhost.ts
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,20 @@ export class StatefulDagLMDGHOST implements ILMDGHOST {
return true;
}

public getJustified(): Checkpoint {
if (!this.justified) {
return null;
}
return {root: Buffer.from(this.justified.node.blockRoot, "hex"), epoch: this.justified.epoch};
}

public getFinalized(): Checkpoint {
if (!this.finalized) {
return null;
}
return {root: Buffer.from(this.finalized.node.blockRoot, "hex"), epoch: this.finalized.epoch};
}

private setFinalized(checkpoint: Checkpoint): void {
this.synced = false;
const rootHex = checkpoint.root.toString("hex");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ describe("assemble attestation", function () {
//TODO: try to test if validator bit is correctly set
expect(result).to.not.be.null;
expect(result.data).to.be.equal(attestationData);
expect(state.slot).to.be.equal(2);
expect(state.slot).to.be.equal(1);
expect(assembleAttestationDataStub.calledOnceWith(dbStub, state, generateEmptyBlock(), 2));
});

Expand Down