Skip to content

Commit

Permalink
Use the pyaload status to detect engine state
Browse files Browse the repository at this point in the history
  • Loading branch information
nazarhussain committed Jul 13, 2023
1 parent 4949901 commit 1181f64
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 101 deletions.
4 changes: 2 additions & 2 deletions packages/beacon-node/src/execution/engine/emitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ import StrictEventEmitter from "strict-event-emitter-types";
import {ExecutionEngineState} from "./interface.js";

export const enum ExecutionEngineEvent {
stateChange = "stateChange",
StateChange = "StateChange",
}

export type ExecutionEngineEvents = {
[ExecutionEngineEvent.stateChange]: (oldState: ExecutionEngineState, newState: ExecutionEngineState) => void;
[ExecutionEngineEvent.StateChange]: (oldState: ExecutionEngineState, newState: ExecutionEngineState) => void;
};

export class ExecutionEngineEventEmitter extends (EventEmitter as {
Expand Down
120 changes: 37 additions & 83 deletions packages/beacon-node/src/execution/engine/http.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import {Root, RootHex, allForks, Wei} from "@lodestar/types";
import {SLOTS_PER_EPOCH, ForkName, ForkSeq} from "@lodestar/params";
import {Logger} from "@lodestar/logger";
import {ErrorJsonRpcResponse, HttpRpcError, isFetchError} from "../../eth1/provider/jsonRpcHttpClient.js";
import {ErrorJsonRpcResponse, HttpRpcError} from "../../eth1/provider/jsonRpcHttpClient.js";
import {IJsonRpcHttpClient, ReqOpts} from "../../eth1/provider/jsonRpcHttpClient.js";
import {Metrics} from "../../metrics/index.js";
import {JobItemQueue} from "../../util/queue/index.js";
import {EPOCHS_PER_BATCH} from "../../sync/constants.js";
import {numToQuantity} from "../../eth1/provider/utils.js";
import {RpcPayload} from "../../eth1/interface.js";
import {
ExecutePayloadStatus,
ExecutePayloadResponse,
Expand All @@ -32,6 +31,7 @@ import {
deserializeExecutionPayloadBody,
} from "./types.js";
import {ExecutionEngineEvent, ExecutionEngineEventEmitter} from "./emitter.js";
import {getExecutionEngineState} from "./utils.js";

export type ExecutionEngineModules = {
signal: AbortSignal;
Expand Down Expand Up @@ -88,7 +88,12 @@ const getPayloadOpts: ReqOpts = {routeId: "getPayload"};
export class ExecutionEngineHttp implements IExecutionEngine {
readonly emitter = new ExecutionEngineEventEmitter();
private logger: Logger;
private state: ExecutionEngineState = ExecutionEngineState.OFFLINE;

// The default state is SYNCING, it will be updated to SYNCING once we receive the first payload
// This assumption is better than the OFFLINE state, since we can't be sure if the EL is offline and being offline may trigger some notifications
// It's safer to to avoid false positives and assume that the EL is syncing until we receive the first payload
private state: ExecutionEngineState = ExecutionEngineState.SYNCING;

readonly payloadIdCache = new PayloadIdCache();
/**
* A queue to serialize the fcUs and newPayloads calls:
Expand Down Expand Up @@ -184,18 +189,20 @@ export class ExecutionEngineHttp implements IExecutionEngine {
};
}

const {status, latestValidHash, validationError} = await this.queueRequestAndUpdateEngineState<
EngineApiRpcReturnTypes[typeof method]
>(engineRequest)
const {status, latestValidHash, validationError} = await (
this.rpcFetchQueue.push(engineRequest) as Promise<EngineApiRpcReturnTypes[typeof method]>
)
// If there are errors by EL like connection refused, internal error, they need to be
// treated separate from being INVALID. For now, just pass the error upstream.
.catch((e: Error): EngineApiRpcReturnTypes[typeof method] => {
this.updateEngineState(getExecutionEngineState({payloadError: e}));
if (e instanceof HttpRpcError || e instanceof ErrorJsonRpcResponse) {
return {status: ExecutePayloadStatus.ELERROR, latestValidHash: null, validationError: e.message};
} else {
return {status: ExecutePayloadStatus.UNAVAILABLE, latestValidHash: null, validationError: e.message};
}
});
this.updateEngineState(getExecutionEngineState({payloadStatus: status}));

switch (status) {
case ExecutePayloadStatus.VALID:
Expand Down Expand Up @@ -276,18 +283,20 @@ export class ExecutionEngineHttp implements IExecutionEngine {
const fcUReqOpts =
payloadAttributes !== undefined ? forkchoiceUpdatedV1Opts : {...forkchoiceUpdatedV1Opts, retryAttempts: 1};

const request = this.queueRequestAndUpdateEngineState<EngineApiRpcReturnTypes[typeof method]>({
const request = this.rpcFetchQueue.push({
method,
params: [{headBlockHash, safeBlockHash, finalizedBlockHash}, payloadAttributesRpc],
methodOpts: fcUReqOpts,
});
}) as Promise<EngineApiRpcReturnTypes[typeof method]>;

const response = await request;
const {
payloadStatus: {status, latestValidHash: _latestValidHash, validationError},
payloadId,
} = response;

this.updateEngineState(getExecutionEngineState({payloadStatus: status}));

switch (status) {
case ExecutePayloadStatus.VALID:
// if payloadAttributes are provided, a valid payloadId is expected
Expand Down Expand Up @@ -338,7 +347,7 @@ export class ExecutionEngineHttp implements IExecutionEngine {
: ForkSeq[fork] >= ForkSeq.capella
? "engine_getPayloadV2"
: "engine_getPayloadV1";
const payloadResponse = await this.requestAndUpdateEngineState<
const payloadResponse = await this.rpc.fetchWithRetries<
EngineApiRpcReturnTypes[typeof method],
EngineApiRpcParamTypes[typeof method]
>(
Expand All @@ -358,7 +367,7 @@ export class ExecutionEngineHttp implements IExecutionEngine {
async getPayloadBodiesByHash(blockHashes: RootHex[]): Promise<(ExecutionPayloadBody | null)[]> {
const method = "engine_getPayloadBodiesByHashV1";
assertReqSizeLimit(blockHashes.length, 32);
const response = await this.requestAndUpdateEngineState<
const response = await this.rpc.fetchWithRetries<
EngineApiRpcReturnTypes[typeof method],
EngineApiRpcParamTypes[typeof method]
>({method, params: blockHashes});
Expand All @@ -373,7 +382,7 @@ export class ExecutionEngineHttp implements IExecutionEngine {
assertReqSizeLimit(blockCount, 32);
const start = numToQuantity(startBlockNumber);
const count = numToQuantity(blockCount);
const response = await this.requestAndUpdateEngineState<
const response = await this.rpc.fetchWithRetries<
EngineApiRpcReturnTypes[typeof method],
EngineApiRpcParamTypes[typeof method]
>({method, params: [start, count]});
Expand All @@ -384,83 +393,28 @@ export class ExecutionEngineHttp implements IExecutionEngine {
return this.state;
}

private queueRequestAndUpdateEngineState<R>(payload: EngineRequest): Promise<R> {
try {
return (this.rpcFetchQueue.push(payload) as Promise<R>).then((response) => {
if (this.state !== ExecutionEngineState.SYNCED) {
// Update the state of the execution engine in async to avoid blocking the request
void this.fetchAndUpdateEngineState().catch((err) => {
this.logger.error("Error updating execution engine state", err);
});
}

return response;
});
} catch (err) {
// Update the state of the execution engine async to avoid blocking the request
void this.fetchAndUpdateEngineState().catch((err) => {
this.logger.error("Error updating execution engine state", err);
});

throw err;
}
}

private async requestAndUpdateEngineState<R, P>(payload: RpcPayload<P>, opts?: ReqOpts): Promise<R> {
try {
const response = await this.rpc.fetchWithRetries<R, P>(payload, opts);

if (this.state !== ExecutionEngineState.SYNCED) {
// Update the state of the execution engine in async to avoid blocking the request
void this.fetchAndUpdateEngineState();
}

return response;
} catch (err) {
// Update the state of the execution engine async to avoid blocking the request
void this.fetchAndUpdateEngineState();
throw err;
}
}

private async fetchAndUpdateEngineState(): Promise<void> {
private updateEngineState(newState: ExecutionEngineState): void {
const oldState = this.state;

try {
const response = await this.rpc.fetch<
boolean | {startingBlock: string; currentBlock: string; highestBlock: string}
>({
method: "eth_syncing",
params: [],
});

if (typeof response === "boolean" && response === false) {
this.state = ExecutionEngineState.SYNCED;
} else {
this.state = ExecutionEngineState.SYNCING;
}

if (oldState === ExecutionEngineState.OFFLINE) {
this.logger.info("ExecutionEngine is back online");
}
} catch (err) {
if (isFetchError(err) && (err.code === "EACCES" || err.code === "ECONNRESET")) {
this.state = ExecutionEngineState.AUTH_FAILED;
if (oldState === newState) return;

switch (newState) {
case ExecutionEngineState.SYNCED:
this.logger.info("ExecutionEngine online and synced");
break;
case ExecutionEngineState.SYNCING:
this.logger.info("ExecutionEngine is online and syncing");
break;
case ExecutionEngineState.OFFLINE:
this.logger.error("ExecutionEngine went offline");
break;
case ExecutionEngineState.AUTH_FAILED:
this.logger.error("ExecutionEngine authentication failed");
} else if (isFetchError(err) && err.code === "ECONNREFUSED") {
this.state = ExecutionEngineState.OFFLINE;

if (oldState !== ExecutionEngineState.OFFLINE) {
this.logger.error("ExecutionEngine went offline");
}
} else {
throw err;
}
break;
}

if (oldState !== this.state) {
this.emitter.emit(ExecutionEngineEvent.stateChange, oldState, this.state);
}
this.state = newState;
this.emitter.emit(ExecutionEngineEvent.StateChange, oldState, newState);
}
}

Expand Down
42 changes: 41 additions & 1 deletion packages/beacon-node/src/execution/engine/utils.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {IJson, RpcPayload} from "../../eth1/interface.js";
import {IJsonRpcHttpClient} from "../../eth1/provider/jsonRpcHttpClient.js";
import {IJsonRpcHttpClient, isFetchError} from "../../eth1/provider/jsonRpcHttpClient.js";
import {ExecutePayloadStatus, ExecutionEngineState} from "./interface.js";

export type JsonRpcBackend = {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
Expand Down Expand Up @@ -27,3 +28,42 @@ export class ExecutionEngineMockJsonRpcClient implements IJsonRpcHttpClient {
return Promise.all(rpcPayloadArr.map((payload) => this.fetch<R>(payload)));
}
}

export function getExecutionEngineState({
payloadError,
payloadStatus,
}:
| {payloadStatus: ExecutePayloadStatus; payloadError?: never}
| {payloadStatus?: never; payloadError: unknown}): ExecutionEngineState {
switch (payloadStatus) {
case ExecutePayloadStatus.ACCEPTED:
case ExecutePayloadStatus.VALID:
case ExecutePayloadStatus.UNSAFE_OPTIMISTIC_STATUS:
return ExecutionEngineState.SYNCED;

case ExecutePayloadStatus.ELERROR:
case ExecutePayloadStatus.INVALID:
case ExecutePayloadStatus.SYNCING:
case ExecutePayloadStatus.INVALID_BLOCK_HASH:
return ExecutionEngineState.SYNCING;

case ExecutePayloadStatus.UNAVAILABLE:
return ExecutionEngineState.OFFLINE;
}

if (payloadError && isFetchError(payloadError) && payloadError.code === "ECONNREFUSED") {
return ExecutionEngineState.OFFLINE;
}

if (
payloadError &&
isFetchError(payloadError) &&
(payloadError.code === "ECONNRESET" || payloadError.code === "ECONNABORTED")
) {
return ExecutionEngineState.AUTH_FAILED;
}

// In case we can't determine the state, we assume it's syncing
// This assumption is better than considering offline, because the offline state may trigger some notifications
return ExecutionEngineState.SYNCING;
}
26 changes: 13 additions & 13 deletions packages/beacon-node/src/node/nodejs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@ import {BeaconChain, IBeaconChain, initBeaconMetrics} from "../chain/index.js";
import {createMetrics, Metrics, HttpMetricsServer, getHttpMetricsServer} from "../metrics/index.js";
import {MonitoringService} from "../monitoring/index.js";
import {getApi, BeaconRestApiServer} from "../api/index.js";
import {initializeExecutionEngine, initializeExecutionBuilder, ExecutionEngineState} from "../execution/index.js";
import {initializeExecutionEngine, initializeExecutionBuilder} from "../execution/index.js";
import {initializeEth1ForBlockProduction} from "../eth1/index.js";
import {initCKZG, loadEthereumTrustedSetup, TrustedFileMode} from "../util/kzg.js";
import {ExecutionEngineEvent} from "../execution/engine/emitter.js";
import {ZERO_HASH_HEX} from "../constants/constants.js";
import {IBeaconNodeOptions} from "./options.js";
import {runNodeNotifier} from "./notifier.js";

Expand Down Expand Up @@ -224,17 +222,19 @@ export class BeaconNode {
: undefined,
});

executionEngine.emitter.addListener(ExecutionEngineEvent.stateChange, async (oldState, newState) => {
// When execution engine come online notify forkchoice of the current state
if (oldState === ExecutionEngineState.OFFLINE && newState !== ExecutionEngineState.AUTH_FAILED) {
const fork = config.getForkName(chain.forkChoice.getHead().slot);
const headBlockHash = chain.forkChoice.getHead().executionPayloadBlockHash ?? ZERO_HASH_HEX;
const safeBlockHash = chain.forkChoice.getJustifiedBlock().executionPayloadBlockHash ?? ZERO_HASH_HEX;
const finalizedBlockHash = chain.forkChoice.getFinalizedBlock().executionPayloadBlockHash ?? ZERO_HASH_HEX;
// This seems to be the behavior of Lighthouse, but we think its not necessary and can be removed
// https://github.com/sigp/lighthouse/blob/c25825a5393aca2cde6e33dd673f8c074c2b543b/beacon_node/execution_layer/src/engines.rs#L252-L259
// executionEngine.emitter.addListener(ExecutionEngineEvent.StateChange, async (oldState, newState) => {
// // When execution engine come online we can notify forkchoice of the current state
// if (oldState === ExecutionEngineState.OFFLINE && newState !== ExecutionEngineState.AUTH_FAILED) {
// const fork = config.getForkName(chain.forkChoice.getHead().slot);
// const headBlockHash = chain.forkChoice.getHead().executionPayloadBlockHash ?? ZERO_HASH_HEX;
// const safeBlockHash = chain.forkChoice.getJustifiedBlock().executionPayloadBlockHash ?? ZERO_HASH_HEX;
// const finalizedBlockHash = chain.forkChoice.getFinalizedBlock().executionPayloadBlockHash ?? ZERO_HASH_HEX;

await executionEngine.notifyForkchoiceUpdate(fork, headBlockHash, safeBlockHash, finalizedBlockHash);
}
});
// await executionEngine.notifyForkchoiceUpdate(fork, headBlockHash, safeBlockHash, finalizedBlockHash);
// }
// });

// Load persisted data from disk to in-memory caches
await chain.loadFromDisk();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {ApiError} from "@lodestar/api";
import {LogLevel, testLogger} from "../../../../../utils/logger.js";
import {getDevBeaconNode} from "../../../../../utils/node/beacon.js";
import {BeaconNode} from "../../../../../../src/node/nodejs.js";
import {getAndInitDevValidators} from "../../../../../utils/node/validator.js";

describe("beacon node api", function () {
this.timeout("30s");
Expand Down Expand Up @@ -79,14 +80,20 @@ describe("beacon node api", function () {
},
chain: {blsVerifyAllMainThread: true},
},
validatorCount,
validatorCount: 5,
logger: testLogger("Node-A", {level: LogLevel.info}),
});
const {validators} = await getAndInitDevValidators({
node: bn,
validatorClientCount: 1,
validatorsPerClient: 5,
startIndex: 0,
});

// Wait for a block to be produced, so that node can have communication with execution engine
await new Promise((resolve) => {
bn.chain.emitter.on(routes.events.EventType.head, async (head) => {
if (head.slot > 0) {
if (head.slot > 2) {
resolve(head);
}
});
Expand All @@ -96,6 +103,8 @@ describe("beacon node api", function () {
ApiError.assert(res);

expect(res.response.data.elOffline).to.eql(false);

await Promise.all(validators.map((v) => v.close()));
});
});
});

0 comments on commit 1181f64

Please sign in to comment.