Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support el_offline in eth/v1/node/syncing #5723

Merged
merged 22 commits into from
Jul 14, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/api/src/beacon/routes/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ export type SyncingStatus = {
isSyncing: boolean;
/** Set to true if the node is optimistically tracking head. */
isOptimistic: boolean;
/** Set to true if the connected el client is offline */
elOffline: boolean;
};

export enum NodeHealth {
Expand Down
2 changes: 1 addition & 1 deletion packages/api/test/unit/beacon/testData/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export const testData: GenericServerTestCases<Api> = {
},
getSyncingStatus: {
args: [],
res: {data: {headSlot: "1", syncDistance: "2", isSyncing: false, isOptimistic: true}},
res: {data: {headSlot: "1", syncDistance: "2", isSyncing: false, isOptimistic: true, elOffline: false}},
},
getHealth: {
args: [],
Expand Down
10 changes: 10 additions & 0 deletions packages/beacon-node/src/eth1/provider/jsonRpcHttpClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@ import {encodeJwtToken} from "./jwt.js";
const maxStringLengthToPrint = 500;
const REQUEST_TIMEOUT = 30 * 1000;

// As we are using `cross-fetch` which does not support for types for errors
// We can't use `node-fetch` for browser compatibility
export type FetchError = {
errno: string;
code: string;
};

export const isFetchError = (error: unknown): error is FetchError =>
(error as FetchError) !== undefined && "code" in (error as FetchError) && "errno" in (error as FetchError);

interface RpcResponse<R> extends RpcResponseError {
result?: R;
}
Expand Down
8 changes: 7 additions & 1 deletion packages/beacon-node/src/execution/engine/disabled.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import {IExecutionEngine, PayloadIdCache} from "./interface.js";
import {ExecutionEngineEventEmitter} from "./emitter.js";
import {ExecutionEngineState, IExecutionEngine, PayloadIdCache} from "./interface.js";

export class ExecutionEngineDisabled implements IExecutionEngine {
readonly emitter = new ExecutionEngineEventEmitter();
readonly payloadIdCache = new PayloadIdCache();

async notifyNewPayload(): Promise<never> {
Expand All @@ -26,4 +28,8 @@ export class ExecutionEngineDisabled implements IExecutionEngine {
getPayloadBodiesByRange(): Promise<never> {
throw Error("Execution engine disabled");
}

getState(): ExecutionEngineState {
throw Error("Execution engine disabled");
}
}
15 changes: 15 additions & 0 deletions packages/beacon-node/src/execution/engine/emitter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import EventEmitter from "node:events";
import StrictEventEmitter from "strict-event-emitter-types";
import {ExecutionEngineState} from "./interface.js";

export const enum ExecutionEngineEvent {
StateChange = "StateChange",
}

export type ExecutionEngineEvents = {
[ExecutionEngineEvent.StateChange]: (oldState: ExecutionEngineState, newState: ExecutionEngineState) => void;
};

export class ExecutionEngineEventEmitter extends (EventEmitter as {
new (): StrictEventEmitter<EventEmitter, ExecutionEngineEvents>;
}) {}
76 changes: 61 additions & 15 deletions packages/beacon-node/src/execution/engine/http.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {Root, RootHex, allForks, Wei} from "@lodestar/types";
import {SLOTS_PER_EPOCH, ForkName, ForkSeq} from "@lodestar/params";

import {Logger} from "@lodestar/logger";
import {ErrorJsonRpcResponse, HttpRpcError} from "../../eth1/provider/jsonRpcHttpClient.js";
import {IJsonRpcHttpClient, ReqOpts} from "../../eth1/provider/jsonRpcHttpClient.js";
import {Metrics} from "../../metrics/index.js";
Expand All @@ -15,6 +15,7 @@ import {
PayloadAttributes,
BlobsBundle,
VersionedHashes,
ExecutionEngineState,
} from "./interface.js";
import {PayloadIdCache} from "./payloadIdCache.js";
import {
Expand All @@ -29,10 +30,13 @@ import {
assertReqSizeLimit,
deserializeExecutionPayloadBody,
} from "./types.js";
import {ExecutionEngineEvent, ExecutionEngineEventEmitter} from "./emitter.js";
import {getExecutionEngineState} from "./utils.js";

export type ExecutionEngineModules = {
signal: AbortSignal;
metrics?: Metrics | null;
logger: Logger;
};

export type ExecutionEngineHttpOpts = {
Expand Down Expand Up @@ -82,6 +86,14 @@ const getPayloadOpts: ReqOpts = {routeId: "getPayload"};
* https://github.com/ethereum/execution-apis/blob/v1.0.0-alpha.1/src/engine/interop/specification.md
*/
export class ExecutionEngineHttp implements IExecutionEngine {
readonly emitter = new ExecutionEngineEventEmitter();
nazarhussain marked this conversation as resolved.
Show resolved Hide resolved
private logger: Logger;

// The default state is SYNCING, it will be updated to SYNCING once we receive the first payload
// This assumption is better than the OFFLINE state, since we can't be sure if the EL is offline and being offline may trigger some notifications
// It's safer to to avoid false positives and assume that the EL is syncing until we receive the first payload
private state: ExecutionEngineState = ExecutionEngineState.SYNCING;

readonly payloadIdCache = new PayloadIdCache();
/**
* A queue to serialize the fcUs and newPayloads calls:
Expand All @@ -103,13 +115,14 @@ export class ExecutionEngineHttp implements IExecutionEngine {

constructor(
private readonly rpc: IJsonRpcHttpClient,
{metrics, signal}: ExecutionEngineModules
{metrics, signal, logger}: ExecutionEngineModules
) {
this.rpcFetchQueue = new JobItemQueue<[EngineRequest], EngineResponse>(
this.jobQueueProcessor,
{maxLength: QUEUE_MAX_LENGTH, maxConcurrency: 1, noYieldIfOneItem: true, signal},
metrics?.engineHttpProcessorQueue
);
this.logger = logger;
}

/**
Expand Down Expand Up @@ -152,7 +165,7 @@ export class ExecutionEngineHttp implements IExecutionEngine {

const serializedExecutionPayload = serializeExecutionPayload(fork, executionPayload);

let engingRequest: EngineRequest;
let engineRequest: EngineRequest;
if (ForkSeq[fork] >= ForkSeq.deneb) {
if (versionedHashes === undefined) {
throw Error(`versionedHashes required in notifyNewPayload for fork=${fork}`);
Expand All @@ -165,32 +178,34 @@ export class ExecutionEngineHttp implements IExecutionEngine {
const parentBeaconBlockRoot = serializeBeaconBlockRoot(parentBlockRoot);

const method = "engine_newPayloadV3";
engingRequest = {
engineRequest = {
method,
params: [serializedExecutionPayload, serializedVersionedHashes, parentBeaconBlockRoot],
methodOpts: notifyNewPayloadOpts,
};
} else {
const method = ForkSeq[fork] >= ForkSeq.capella ? "engine_newPayloadV2" : "engine_newPayloadV1";
engingRequest = {
engineRequest = {
method,
params: [serializedExecutionPayload],
methodOpts: notifyNewPayloadOpts,
};
}

const {status, latestValidHash, validationError} = await (
this.rpcFetchQueue.push(engingRequest) as Promise<EngineApiRpcReturnTypes[typeof method]>
this.rpcFetchQueue.push(engineRequest) as Promise<EngineApiRpcReturnTypes[typeof method]>
)
// If there are errors by EL like connection refused, internal error, they need to be
// treated separate from being INVALID. For now, just pass the error upstream.
.catch((e: Error): EngineApiRpcReturnTypes[typeof method] => {
this.updateEngineState(getExecutionEngineState({payloadError: e}));
if (e instanceof HttpRpcError || e instanceof ErrorJsonRpcResponse) {
return {status: ExecutePayloadStatus.ELERROR, latestValidHash: null, validationError: e.message};
} else {
return {status: ExecutePayloadStatus.UNAVAILABLE, latestValidHash: null, validationError: e.message};
}
});
this.updateEngineState(getExecutionEngineState({payloadStatus: status}));

switch (status) {
case ExecutePayloadStatus.VALID:
Expand Down Expand Up @@ -277,12 +292,21 @@ export class ExecutionEngineHttp implements IExecutionEngine {
methodOpts: fcUReqOpts,
}) as Promise<EngineApiRpcReturnTypes[typeof method]>;

const response = await request;
const response = await request
// If there are errors by EL like connection refused, internal error, they need to be
// treated separate from being INVALID. For now, just pass the error upstream.
.catch((e: Error): EngineApiRpcReturnTypes[typeof method] => {
this.updateEngineState(getExecutionEngineState({payloadError: e}));
throw e;
});

const {
payloadStatus: {status, latestValidHash: _latestValidHash, validationError},
payloadId,
} = response;

this.updateEngineState(getExecutionEngineState({payloadStatus: status}));

switch (status) {
case ExecutePayloadStatus.VALID:
// if payloadAttributes are provided, a valid payloadId is expected
Expand Down Expand Up @@ -356,10 +380,7 @@ export class ExecutionEngineHttp implements IExecutionEngine {
const response = await this.rpc.fetchWithRetries<
EngineApiRpcReturnTypes[typeof method],
EngineApiRpcParamTypes[typeof method]
>({
method,
params: blockHashes,
});
>({method, params: blockHashes});
return response.map(deserializeExecutionPayloadBody);
}

Expand All @@ -374,12 +395,37 @@ export class ExecutionEngineHttp implements IExecutionEngine {
const response = await this.rpc.fetchWithRetries<
EngineApiRpcReturnTypes[typeof method],
EngineApiRpcParamTypes[typeof method]
>({
method,
params: [start, count],
});
>({method, params: [start, count]});
return response.map(deserializeExecutionPayloadBody);
}

getState(): ExecutionEngineState {
return this.state;
}

private updateEngineState(newState: ExecutionEngineState): void {
const oldState = this.state;

if (oldState === newState) return;

switch (newState) {
case ExecutionEngineState.SYNCED:
this.logger.info("ExecutionEngine online and synced");
break;
case ExecutionEngineState.SYNCING:
this.logger.info("ExecutionEngine is online and syncing");
break;
case ExecutionEngineState.OFFLINE:
this.logger.error("ExecutionEngine went offline");
break;
case ExecutionEngineState.AUTH_FAILED:
this.logger.error("ExecutionEngine authentication failed");
break;
}

this.state = newState;
this.emitter.emit(ExecutionEngineEvent.StateChange, oldState, newState);
}
}

type EngineRequestKey = keyof EngineApiRpcParamTypes;
Expand Down
11 changes: 11 additions & 0 deletions packages/beacon-node/src/execution/engine/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {Root, RootHex, allForks, capella, Wei} from "@lodestar/types";
import {DATA, QUANTITY} from "../../eth1/provider/utils.js";
import {PayloadIdCache, PayloadId, WithdrawalV1} from "./payloadIdCache.js";
import {ExecutionPayloadBody} from "./types.js";
import {ExecutionEngineEventEmitter} from "./emitter.js";

export {PayloadIdCache, PayloadId, WithdrawalV1};

Expand All @@ -30,6 +31,13 @@ export enum ExecutePayloadStatus {
UNSAFE_OPTIMISTIC_STATUS = "UNSAFE_OPTIMISTIC_STATUS",
}

export enum ExecutionEngineState {
SYNCING = "SYNCING",
SYNCED = "SYNCED",
OFFLINE = "OFFLINE",
AUTH_FAILED = "AUTH_FAILED",
}

export type ExecutePayloadResponse =
| {status: ExecutePayloadStatus.SYNCING | ExecutePayloadStatus.ACCEPTED; latestValidHash: null; validationError: null}
| {status: ExecutePayloadStatus.VALID; latestValidHash: RootHex; validationError: null}
Expand Down Expand Up @@ -80,6 +88,7 @@ export type VersionedHashes = Uint8Array[];
* - Integrated code into the same binary
*/
export interface IExecutionEngine {
emitter: ExecutionEngineEventEmitter;
payloadIdCache: PayloadIdCache;
/**
* A state transition function which applies changes to the self.execution_state.
Expand Down Expand Up @@ -132,4 +141,6 @@ export interface IExecutionEngine {
getPayloadBodiesByHash(blockHash: DATA[]): Promise<(ExecutionPayloadBody | null)[]>;

getPayloadBodiesByRange(start: number, count: number): Promise<(ExecutionPayloadBody | null)[]>;

getState(): ExecutionEngineState;
}
46 changes: 45 additions & 1 deletion packages/beacon-node/src/execution/engine/utils.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {IJson, RpcPayload} from "../../eth1/interface.js";
import {IJsonRpcHttpClient} from "../../eth1/provider/jsonRpcHttpClient.js";
import {IJsonRpcHttpClient, isFetchError} from "../../eth1/provider/jsonRpcHttpClient.js";
import {ExecutePayloadStatus, ExecutionEngineState} from "./interface.js";

export type JsonRpcBackend = {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
Expand Down Expand Up @@ -27,3 +28,46 @@ export class ExecutionEngineMockJsonRpcClient implements IJsonRpcHttpClient {
return Promise.all(rpcPayloadArr.map((payload) => this.fetch<R>(payload)));
}
}

export function getExecutionEngineState({
payloadError,
payloadStatus,
}:
| {payloadStatus: ExecutePayloadStatus; payloadError?: never}
| {payloadStatus?: never; payloadError: unknown}): ExecutionEngineState {
switch (payloadStatus) {
case ExecutePayloadStatus.ACCEPTED:
case ExecutePayloadStatus.VALID:
case ExecutePayloadStatus.UNSAFE_OPTIMISTIC_STATUS:
return ExecutionEngineState.SYNCED;

case ExecutePayloadStatus.ELERROR:
case ExecutePayloadStatus.INVALID:
case ExecutePayloadStatus.SYNCING:
case ExecutePayloadStatus.INVALID_BLOCK_HASH:
return ExecutionEngineState.SYNCING;

case ExecutePayloadStatus.UNAVAILABLE:
return ExecutionEngineState.OFFLINE;
}

if (
payloadError &&
isFetchError(payloadError) &&
(payloadError.code === "ECONNREFUSED" || payloadError.code === "ENOTFOUND")
) {
return ExecutionEngineState.OFFLINE;
}

if (
payloadError &&
isFetchError(payloadError) &&
(payloadError.code === "ECONNRESET" || payloadError.code === "ECONNABORTED")
) {
return ExecutionEngineState.AUTH_FAILED;
}

// In case we can't determine the state, we assume it's syncing
// This assumption is better than considering offline, because the offline state may trigger some notifications
return ExecutionEngineState.SYNCING;
}
23 changes: 22 additions & 1 deletion packages/beacon-node/src/node/nodejs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ enum LoggerModule {
backfill = "backfill",
chain = "chain",
eth1 = "eth1",
executionEngine = "execution_engine",
nazarhussain marked this conversation as resolved.
Show resolved Hide resolved
metrics = "metrics",
monitoring = "monitoring",
network = "network",
Expand Down Expand Up @@ -195,6 +196,12 @@ export class BeaconNode {
)
: null;

const executionEngine = initializeExecutionEngine(opts.executionEngine, {
metrics,
signal,
logger: logger.child({module: LoggerModule.executionEngine}),
});

const chain = new BeaconChain(opts.chain, {
config,
db,
Expand All @@ -209,12 +216,26 @@ export class BeaconNode {
logger: logger.child({module: LoggerModule.eth1}),
signal,
}),
executionEngine: initializeExecutionEngine(opts.executionEngine, {metrics, signal}),
executionEngine,
executionBuilder: opts.executionBuilder.enabled
? initializeExecutionBuilder(opts.executionBuilder, config, metrics)
: undefined,
});

// This seems to be the behavior of Lighthouse, but we think its not necessary and can be removed
// https://github.com/sigp/lighthouse/blob/c25825a5393aca2cde6e33dd673f8c074c2b543b/beacon_node/execution_layer/src/engines.rs#L252-L259
// executionEngine.emitter.addListener(ExecutionEngineEvent.StateChange, async (oldState, newState) => {
// // When execution engine come online we can notify forkchoice of the current state
// if (oldState === ExecutionEngineState.OFFLINE && newState !== ExecutionEngineState.AUTH_FAILED) {
// const fork = config.getForkName(chain.forkChoice.getHead().slot);
// const headBlockHash = chain.forkChoice.getHead().executionPayloadBlockHash ?? ZERO_HASH_HEX;
// const safeBlockHash = chain.forkChoice.getJustifiedBlock().executionPayloadBlockHash ?? ZERO_HASH_HEX;
// const finalizedBlockHash = chain.forkChoice.getFinalizedBlock().executionPayloadBlockHash ?? ZERO_HASH_HEX;

// await executionEngine.notifyForkchoiceUpdate(fork, headBlockHash, safeBlockHash, finalizedBlockHash);
// }
// });

// Load persisted data from disk to in-memory caches
await chain.loadFromDisk();

Expand Down
Loading
Loading