diff --git a/packages/beacon-node/src/execution/engine/emitter.ts b/packages/beacon-node/src/execution/engine/emitter.ts index 8329f2600b44..f53b95596be3 100644 --- a/packages/beacon-node/src/execution/engine/emitter.ts +++ b/packages/beacon-node/src/execution/engine/emitter.ts @@ -3,11 +3,11 @@ import StrictEventEmitter from "strict-event-emitter-types"; import {ExecutionEngineState} from "./interface.js"; export const enum ExecutionEngineEvent { - stateChange = "stateChange", + StateChange = "StateChange", } export type ExecutionEngineEvents = { - [ExecutionEngineEvent.stateChange]: (oldState: ExecutionEngineState, newState: ExecutionEngineState) => void; + [ExecutionEngineEvent.StateChange]: (oldState: ExecutionEngineState, newState: ExecutionEngineState) => void; }; export class ExecutionEngineEventEmitter extends (EventEmitter as { diff --git a/packages/beacon-node/src/execution/engine/http.ts b/packages/beacon-node/src/execution/engine/http.ts index fa104d4085f6..4cff30a5d4da 100644 --- a/packages/beacon-node/src/execution/engine/http.ts +++ b/packages/beacon-node/src/execution/engine/http.ts @@ -1,13 +1,12 @@ import {Root, RootHex, allForks, Wei} from "@lodestar/types"; import {SLOTS_PER_EPOCH, ForkName, ForkSeq} from "@lodestar/params"; import {Logger} from "@lodestar/logger"; -import {ErrorJsonRpcResponse, HttpRpcError, isFetchError} from "../../eth1/provider/jsonRpcHttpClient.js"; +import {ErrorJsonRpcResponse, HttpRpcError} from "../../eth1/provider/jsonRpcHttpClient.js"; import {IJsonRpcHttpClient, ReqOpts} from "../../eth1/provider/jsonRpcHttpClient.js"; import {Metrics} from "../../metrics/index.js"; import {JobItemQueue} from "../../util/queue/index.js"; import {EPOCHS_PER_BATCH} from "../../sync/constants.js"; import {numToQuantity} from "../../eth1/provider/utils.js"; -import {RpcPayload} from "../../eth1/interface.js"; import { ExecutePayloadStatus, ExecutePayloadResponse, @@ -32,6 +31,7 @@ import { deserializeExecutionPayloadBody, } from "./types.js"; import {ExecutionEngineEvent, ExecutionEngineEventEmitter} from "./emitter.js"; +import {getExecutionEngineState} from "./utils.js"; export type ExecutionEngineModules = { signal: AbortSignal; @@ -88,7 +88,12 @@ const getPayloadOpts: ReqOpts = {routeId: "getPayload"}; export class ExecutionEngineHttp implements IExecutionEngine { readonly emitter = new ExecutionEngineEventEmitter(); private logger: Logger; - private state: ExecutionEngineState = ExecutionEngineState.OFFLINE; + + // The default state is SYNCING, it will be updated to SYNCING once we receive the first payload + // This assumption is better than the OFFLINE state, since we can't be sure if the EL is offline and being offline may trigger some notifications + // It's safer to to avoid false positives and assume that the EL is syncing until we receive the first payload + private state: ExecutionEngineState = ExecutionEngineState.SYNCING; + readonly payloadIdCache = new PayloadIdCache(); /** * A queue to serialize the fcUs and newPayloads calls: @@ -184,18 +189,20 @@ export class ExecutionEngineHttp implements IExecutionEngine { }; } - const {status, latestValidHash, validationError} = await this.queueRequestAndUpdateEngineState< - EngineApiRpcReturnTypes[typeof method] - >(engineRequest) + const {status, latestValidHash, validationError} = await ( + this.rpcFetchQueue.push(engineRequest) as Promise + ) // If there are errors by EL like connection refused, internal error, they need to be // treated separate from being INVALID. For now, just pass the error upstream. .catch((e: Error): EngineApiRpcReturnTypes[typeof method] => { + this.updateEngineState(getExecutionEngineState({payloadError: e})); if (e instanceof HttpRpcError || e instanceof ErrorJsonRpcResponse) { return {status: ExecutePayloadStatus.ELERROR, latestValidHash: null, validationError: e.message}; } else { return {status: ExecutePayloadStatus.UNAVAILABLE, latestValidHash: null, validationError: e.message}; } }); + this.updateEngineState(getExecutionEngineState({payloadStatus: status})); switch (status) { case ExecutePayloadStatus.VALID: @@ -276,11 +283,11 @@ export class ExecutionEngineHttp implements IExecutionEngine { const fcUReqOpts = payloadAttributes !== undefined ? forkchoiceUpdatedV1Opts : {...forkchoiceUpdatedV1Opts, retryAttempts: 1}; - const request = this.queueRequestAndUpdateEngineState({ + const request = this.rpcFetchQueue.push({ method, params: [{headBlockHash, safeBlockHash, finalizedBlockHash}, payloadAttributesRpc], methodOpts: fcUReqOpts, - }); + }) as Promise; const response = await request; const { @@ -288,6 +295,8 @@ export class ExecutionEngineHttp implements IExecutionEngine { payloadId, } = response; + this.updateEngineState(getExecutionEngineState({payloadStatus: status})); + switch (status) { case ExecutePayloadStatus.VALID: // if payloadAttributes are provided, a valid payloadId is expected @@ -338,7 +347,7 @@ export class ExecutionEngineHttp implements IExecutionEngine { : ForkSeq[fork] >= ForkSeq.capella ? "engine_getPayloadV2" : "engine_getPayloadV1"; - const payloadResponse = await this.requestAndUpdateEngineState< + const payloadResponse = await this.rpc.fetchWithRetries< EngineApiRpcReturnTypes[typeof method], EngineApiRpcParamTypes[typeof method] >( @@ -358,7 +367,7 @@ export class ExecutionEngineHttp implements IExecutionEngine { async getPayloadBodiesByHash(blockHashes: RootHex[]): Promise<(ExecutionPayloadBody | null)[]> { const method = "engine_getPayloadBodiesByHashV1"; assertReqSizeLimit(blockHashes.length, 32); - const response = await this.requestAndUpdateEngineState< + const response = await this.rpc.fetchWithRetries< EngineApiRpcReturnTypes[typeof method], EngineApiRpcParamTypes[typeof method] >({method, params: blockHashes}); @@ -373,7 +382,7 @@ export class ExecutionEngineHttp implements IExecutionEngine { assertReqSizeLimit(blockCount, 32); const start = numToQuantity(startBlockNumber); const count = numToQuantity(blockCount); - const response = await this.requestAndUpdateEngineState< + const response = await this.rpc.fetchWithRetries< EngineApiRpcReturnTypes[typeof method], EngineApiRpcParamTypes[typeof method] >({method, params: [start, count]}); @@ -384,83 +393,28 @@ export class ExecutionEngineHttp implements IExecutionEngine { return this.state; } - private queueRequestAndUpdateEngineState(payload: EngineRequest): Promise { - try { - return (this.rpcFetchQueue.push(payload) as Promise).then((response) => { - if (this.state !== ExecutionEngineState.SYNCED) { - // Update the state of the execution engine in async to avoid blocking the request - void this.fetchAndUpdateEngineState().catch((err) => { - this.logger.error("Error updating execution engine state", err); - }); - } - - return response; - }); - } catch (err) { - // Update the state of the execution engine async to avoid blocking the request - void this.fetchAndUpdateEngineState().catch((err) => { - this.logger.error("Error updating execution engine state", err); - }); - - throw err; - } - } - - private async requestAndUpdateEngineState(payload: RpcPayload

, opts?: ReqOpts): Promise { - try { - const response = await this.rpc.fetchWithRetries(payload, opts); - - if (this.state !== ExecutionEngineState.SYNCED) { - // Update the state of the execution engine in async to avoid blocking the request - void this.fetchAndUpdateEngineState(); - } - - return response; - } catch (err) { - // Update the state of the execution engine async to avoid blocking the request - void this.fetchAndUpdateEngineState(); - throw err; - } - } - - private async fetchAndUpdateEngineState(): Promise { + private updateEngineState(newState: ExecutionEngineState): void { const oldState = this.state; - try { - const response = await this.rpc.fetch< - boolean | {startingBlock: string; currentBlock: string; highestBlock: string} - >({ - method: "eth_syncing", - params: [], - }); - - if (typeof response === "boolean" && response === false) { - this.state = ExecutionEngineState.SYNCED; - } else { - this.state = ExecutionEngineState.SYNCING; - } - - if (oldState === ExecutionEngineState.OFFLINE) { - this.logger.info("ExecutionEngine is back online"); - } - } catch (err) { - if (isFetchError(err) && (err.code === "EACCES" || err.code === "ECONNRESET")) { - this.state = ExecutionEngineState.AUTH_FAILED; + if (oldState === newState) return; + + switch (newState) { + case ExecutionEngineState.SYNCED: + this.logger.info("ExecutionEngine online and synced"); + break; + case ExecutionEngineState.SYNCING: + this.logger.info("ExecutionEngine is online and syncing"); + break; + case ExecutionEngineState.OFFLINE: + this.logger.error("ExecutionEngine went offline"); + break; + case ExecutionEngineState.AUTH_FAILED: this.logger.error("ExecutionEngine authentication failed"); - } else if (isFetchError(err) && err.code === "ECONNREFUSED") { - this.state = ExecutionEngineState.OFFLINE; - - if (oldState !== ExecutionEngineState.OFFLINE) { - this.logger.error("ExecutionEngine went offline"); - } - } else { - throw err; - } + break; } - if (oldState !== this.state) { - this.emitter.emit(ExecutionEngineEvent.stateChange, oldState, this.state); - } + this.state = newState; + this.emitter.emit(ExecutionEngineEvent.StateChange, oldState, newState); } } diff --git a/packages/beacon-node/src/execution/engine/utils.ts b/packages/beacon-node/src/execution/engine/utils.ts index cbdd51930ba3..a8f6739d9345 100644 --- a/packages/beacon-node/src/execution/engine/utils.ts +++ b/packages/beacon-node/src/execution/engine/utils.ts @@ -1,5 +1,6 @@ import {IJson, RpcPayload} from "../../eth1/interface.js"; -import {IJsonRpcHttpClient} from "../../eth1/provider/jsonRpcHttpClient.js"; +import {IJsonRpcHttpClient, isFetchError} from "../../eth1/provider/jsonRpcHttpClient.js"; +import {ExecutePayloadStatus, ExecutionEngineState} from "./interface.js"; export type JsonRpcBackend = { // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -27,3 +28,42 @@ export class ExecutionEngineMockJsonRpcClient implements IJsonRpcHttpClient { return Promise.all(rpcPayloadArr.map((payload) => this.fetch(payload))); } } + +export function getExecutionEngineState({ + payloadError, + payloadStatus, +}: + | {payloadStatus: ExecutePayloadStatus; payloadError?: never} + | {payloadStatus?: never; payloadError: unknown}): ExecutionEngineState { + switch (payloadStatus) { + case ExecutePayloadStatus.ACCEPTED: + case ExecutePayloadStatus.VALID: + case ExecutePayloadStatus.UNSAFE_OPTIMISTIC_STATUS: + return ExecutionEngineState.SYNCED; + + case ExecutePayloadStatus.ELERROR: + case ExecutePayloadStatus.INVALID: + case ExecutePayloadStatus.SYNCING: + case ExecutePayloadStatus.INVALID_BLOCK_HASH: + return ExecutionEngineState.SYNCING; + + case ExecutePayloadStatus.UNAVAILABLE: + return ExecutionEngineState.OFFLINE; + } + + if (payloadError && isFetchError(payloadError) && payloadError.code === "ECONNREFUSED") { + return ExecutionEngineState.OFFLINE; + } + + if ( + payloadError && + isFetchError(payloadError) && + (payloadError.code === "ECONNRESET" || payloadError.code === "ECONNABORTED") + ) { + return ExecutionEngineState.AUTH_FAILED; + } + + // In case we can't determine the state, we assume it's syncing + // This assumption is better than considering offline, because the offline state may trigger some notifications + return ExecutionEngineState.SYNCING; +} diff --git a/packages/beacon-node/src/node/nodejs.ts b/packages/beacon-node/src/node/nodejs.ts index 8173a1063e33..00a9161220e9 100644 --- a/packages/beacon-node/src/node/nodejs.ts +++ b/packages/beacon-node/src/node/nodejs.ts @@ -18,11 +18,9 @@ import {BeaconChain, IBeaconChain, initBeaconMetrics} from "../chain/index.js"; import {createMetrics, Metrics, HttpMetricsServer, getHttpMetricsServer} from "../metrics/index.js"; import {MonitoringService} from "../monitoring/index.js"; import {getApi, BeaconRestApiServer} from "../api/index.js"; -import {initializeExecutionEngine, initializeExecutionBuilder, ExecutionEngineState} from "../execution/index.js"; +import {initializeExecutionEngine, initializeExecutionBuilder} from "../execution/index.js"; import {initializeEth1ForBlockProduction} from "../eth1/index.js"; import {initCKZG, loadEthereumTrustedSetup, TrustedFileMode} from "../util/kzg.js"; -import {ExecutionEngineEvent} from "../execution/engine/emitter.js"; -import {ZERO_HASH_HEX} from "../constants/constants.js"; import {IBeaconNodeOptions} from "./options.js"; import {runNodeNotifier} from "./notifier.js"; @@ -224,17 +222,19 @@ export class BeaconNode { : undefined, }); - executionEngine.emitter.addListener(ExecutionEngineEvent.stateChange, async (oldState, newState) => { - // When execution engine come online notify forkchoice of the current state - if (oldState === ExecutionEngineState.OFFLINE && newState !== ExecutionEngineState.AUTH_FAILED) { - const fork = config.getForkName(chain.forkChoice.getHead().slot); - const headBlockHash = chain.forkChoice.getHead().executionPayloadBlockHash ?? ZERO_HASH_HEX; - const safeBlockHash = chain.forkChoice.getJustifiedBlock().executionPayloadBlockHash ?? ZERO_HASH_HEX; - const finalizedBlockHash = chain.forkChoice.getFinalizedBlock().executionPayloadBlockHash ?? ZERO_HASH_HEX; + // This seems to be the behavior of Lighthouse, but we think its not necessary and can be removed + // https://github.com/sigp/lighthouse/blob/c25825a5393aca2cde6e33dd673f8c074c2b543b/beacon_node/execution_layer/src/engines.rs#L252-L259 + // executionEngine.emitter.addListener(ExecutionEngineEvent.StateChange, async (oldState, newState) => { + // // When execution engine come online we can notify forkchoice of the current state + // if (oldState === ExecutionEngineState.OFFLINE && newState !== ExecutionEngineState.AUTH_FAILED) { + // const fork = config.getForkName(chain.forkChoice.getHead().slot); + // const headBlockHash = chain.forkChoice.getHead().executionPayloadBlockHash ?? ZERO_HASH_HEX; + // const safeBlockHash = chain.forkChoice.getJustifiedBlock().executionPayloadBlockHash ?? ZERO_HASH_HEX; + // const finalizedBlockHash = chain.forkChoice.getFinalizedBlock().executionPayloadBlockHash ?? ZERO_HASH_HEX; - await executionEngine.notifyForkchoiceUpdate(fork, headBlockHash, safeBlockHash, finalizedBlockHash); - } - }); + // await executionEngine.notifyForkchoiceUpdate(fork, headBlockHash, safeBlockHash, finalizedBlockHash); + // } + // }); // Load persisted data from disk to in-memory caches await chain.loadFromDisk(); diff --git a/packages/beacon-node/test/e2e/api/impl/beacon/node/endpoints.test.ts b/packages/beacon-node/test/e2e/api/impl/beacon/node/endpoints.test.ts index 1dacdcc0b94b..62ed9c113484 100644 --- a/packages/beacon-node/test/e2e/api/impl/beacon/node/endpoints.test.ts +++ b/packages/beacon-node/test/e2e/api/impl/beacon/node/endpoints.test.ts @@ -6,6 +6,7 @@ import {ApiError} from "@lodestar/api"; import {LogLevel, testLogger} from "../../../../../utils/logger.js"; import {getDevBeaconNode} from "../../../../../utils/node/beacon.js"; import {BeaconNode} from "../../../../../../src/node/nodejs.js"; +import {getAndInitDevValidators} from "../../../../../utils/node/validator.js"; describe("beacon node api", function () { this.timeout("30s"); @@ -79,14 +80,20 @@ describe("beacon node api", function () { }, chain: {blsVerifyAllMainThread: true}, }, - validatorCount, + validatorCount: 5, logger: testLogger("Node-A", {level: LogLevel.info}), }); + const {validators} = await getAndInitDevValidators({ + node: bn, + validatorClientCount: 1, + validatorsPerClient: 5, + startIndex: 0, + }); // Wait for a block to be produced, so that node can have communication with execution engine await new Promise((resolve) => { bn.chain.emitter.on(routes.events.EventType.head, async (head) => { - if (head.slot > 0) { + if (head.slot > 2) { resolve(head); } }); @@ -96,6 +103,8 @@ describe("beacon node api", function () { ApiError.assert(res); expect(res.response.data.elOffline).to.eql(false); + + await Promise.all(validators.map((v) => v.close())); }); }); });