Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support el_offline in eth/v1/node/syncing #5723

Merged
merged 22 commits into from
Jul 14, 2023
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/api/src/beacon/routes/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ export type SyncingStatus = {
isSyncing: boolean;
/** Set to true if the node is optimistically tracking head. */
isOptimistic: boolean;
/** Set to true if the connected el client is offline */
elOffline: boolean;
};

export enum NodeHealth {
Expand Down
2 changes: 1 addition & 1 deletion packages/api/test/unit/beacon/testData/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export const testData: GenericServerTestCases<Api> = {
},
getSyncingStatus: {
args: [],
res: {data: {headSlot: "1", syncDistance: "2", isSyncing: false, isOptimistic: true}},
res: {data: {headSlot: "1", syncDistance: "2", isSyncing: false, isOptimistic: true, elOffline: false}},
},
getHealth: {
args: [],
Expand Down
10 changes: 10 additions & 0 deletions packages/beacon-node/src/eth1/provider/jsonRpcHttpClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@ import {encodeJwtToken} from "./jwt.js";
const maxStringLengthToPrint = 500;
const REQUEST_TIMEOUT = 30 * 1000;

// As we are using `cross-fetch` which does not support for types for errors
// We can't use `node-fetch` for browser compatibility
export type FetchError = {
errno: string;
code: string;
};

export const isFetchError = (error: unknown): error is FetchError =>
(error as FetchError) !== undefined && "code" in (error as FetchError) && "errno" in (error as FetchError);

interface RpcResponse<R> extends RpcResponseError {
result?: R;
}
Expand Down
6 changes: 5 additions & 1 deletion packages/beacon-node/src/execution/engine/disabled.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {IExecutionEngine, PayloadIdCache} from "./interface.js";
import {ExecutionEngineState, IExecutionEngine, PayloadIdCache} from "./interface.js";

export class ExecutionEngineDisabled implements IExecutionEngine {
readonly payloadIdCache = new PayloadIdCache();
Expand Down Expand Up @@ -26,4 +26,8 @@ export class ExecutionEngineDisabled implements IExecutionEngine {
getPayloadBodiesByRange(): Promise<never> {
throw Error("Execution engine disabled");
}

getState(): ExecutionEngineState {
throw Error("Execution engine disabled");
}
}
73 changes: 58 additions & 15 deletions packages/beacon-node/src/execution/engine/http.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {Root, RootHex, allForks, Wei} from "@lodestar/types";
import {SLOTS_PER_EPOCH, ForkName, ForkSeq} from "@lodestar/params";

import {Logger} from "@lodestar/logger";
import {ErrorJsonRpcResponse, HttpRpcError} from "../../eth1/provider/jsonRpcHttpClient.js";
import {IJsonRpcHttpClient, ReqOpts} from "../../eth1/provider/jsonRpcHttpClient.js";
import {Metrics} from "../../metrics/index.js";
Expand All @@ -15,6 +15,7 @@ import {
PayloadAttributes,
BlobsBundle,
VersionedHashes,
ExecutionEngineState,
} from "./interface.js";
import {PayloadIdCache} from "./payloadIdCache.js";
import {
Expand All @@ -29,10 +30,12 @@ import {
assertReqSizeLimit,
deserializeExecutionPayloadBody,
} from "./types.js";
import {getExecutionEngineState} from "./utils.js";

export type ExecutionEngineModules = {
signal: AbortSignal;
metrics?: Metrics | null;
logger: Logger;
};

export type ExecutionEngineHttpOpts = {
Expand Down Expand Up @@ -82,6 +85,13 @@ const getPayloadOpts: ReqOpts = {routeId: "getPayload"};
* https://github.com/ethereum/execution-apis/blob/v1.0.0-alpha.1/src/engine/interop/specification.md
*/
export class ExecutionEngineHttp implements IExecutionEngine {
private logger: Logger;

// The default state is SYNCING, it will be updated to SYNCING once we receive the first payload
// This assumption is better than the OFFLINE state, since we can't be sure if the EL is offline and being offline may trigger some notifications
// It's safer to to avoid false positives and assume that the EL is syncing until we receive the first payload
private state: ExecutionEngineState = ExecutionEngineState.SYNCING;

readonly payloadIdCache = new PayloadIdCache();
/**
* A queue to serialize the fcUs and newPayloads calls:
Expand All @@ -103,13 +113,14 @@ export class ExecutionEngineHttp implements IExecutionEngine {

constructor(
private readonly rpc: IJsonRpcHttpClient,
{metrics, signal}: ExecutionEngineModules
{metrics, signal, logger}: ExecutionEngineModules
) {
this.rpcFetchQueue = new JobItemQueue<[EngineRequest], EngineResponse>(
this.jobQueueProcessor,
{maxLength: QUEUE_MAX_LENGTH, maxConcurrency: 1, noYieldIfOneItem: true, signal},
metrics?.engineHttpProcessorQueue
);
this.logger = logger;
}

/**
Expand Down Expand Up @@ -152,7 +163,7 @@ export class ExecutionEngineHttp implements IExecutionEngine {

const serializedExecutionPayload = serializeExecutionPayload(fork, executionPayload);

let engingRequest: EngineRequest;
let engineRequest: EngineRequest;
if (ForkSeq[fork] >= ForkSeq.deneb) {
if (versionedHashes === undefined) {
throw Error(`versionedHashes required in notifyNewPayload for fork=${fork}`);
Expand All @@ -165,32 +176,34 @@ export class ExecutionEngineHttp implements IExecutionEngine {
const parentBeaconBlockRoot = serializeBeaconBlockRoot(parentBlockRoot);

const method = "engine_newPayloadV3";
engingRequest = {
engineRequest = {
method,
params: [serializedExecutionPayload, serializedVersionedHashes, parentBeaconBlockRoot],
methodOpts: notifyNewPayloadOpts,
};
} else {
const method = ForkSeq[fork] >= ForkSeq.capella ? "engine_newPayloadV2" : "engine_newPayloadV1";
engingRequest = {
engineRequest = {
method,
params: [serializedExecutionPayload],
methodOpts: notifyNewPayloadOpts,
};
}

const {status, latestValidHash, validationError} = await (
this.rpcFetchQueue.push(engingRequest) as Promise<EngineApiRpcReturnTypes[typeof method]>
this.rpcFetchQueue.push(engineRequest) as Promise<EngineApiRpcReturnTypes[typeof method]>
)
// If there are errors by EL like connection refused, internal error, they need to be
// treated separate from being INVALID. For now, just pass the error upstream.
.catch((e: Error): EngineApiRpcReturnTypes[typeof method] => {
this.updateEngineState(getExecutionEngineState({payloadError: e}));
if (e instanceof HttpRpcError || e instanceof ErrorJsonRpcResponse) {
return {status: ExecutePayloadStatus.ELERROR, latestValidHash: null, validationError: e.message};
} else {
return {status: ExecutePayloadStatus.UNAVAILABLE, latestValidHash: null, validationError: e.message};
}
});
this.updateEngineState(getExecutionEngineState({payloadStatus: status}));

switch (status) {
case ExecutePayloadStatus.VALID:
Expand Down Expand Up @@ -277,12 +290,21 @@ export class ExecutionEngineHttp implements IExecutionEngine {
methodOpts: fcUReqOpts,
}) as Promise<EngineApiRpcReturnTypes[typeof method]>;

const response = await request;
const response = await request
// If there are errors by EL like connection refused, internal error, they need to be
// treated separate from being INVALID. For now, just pass the error upstream.
.catch((e: Error): EngineApiRpcReturnTypes[typeof method] => {
this.updateEngineState(getExecutionEngineState({payloadError: e}));
throw e;
});

const {
payloadStatus: {status, latestValidHash: _latestValidHash, validationError},
payloadId,
} = response;

this.updateEngineState(getExecutionEngineState({payloadStatus: status}));

switch (status) {
case ExecutePayloadStatus.VALID:
// if payloadAttributes are provided, a valid payloadId is expected
Expand Down Expand Up @@ -356,10 +378,7 @@ export class ExecutionEngineHttp implements IExecutionEngine {
const response = await this.rpc.fetchWithRetries<
EngineApiRpcReturnTypes[typeof method],
EngineApiRpcParamTypes[typeof method]
>({
method,
params: blockHashes,
});
>({method, params: blockHashes});
return response.map(deserializeExecutionPayloadBody);
}

Expand All @@ -374,12 +393,36 @@ export class ExecutionEngineHttp implements IExecutionEngine {
const response = await this.rpc.fetchWithRetries<
EngineApiRpcReturnTypes[typeof method],
EngineApiRpcParamTypes[typeof method]
>({
method,
params: [start, count],
});
>({method, params: [start, count]});
return response.map(deserializeExecutionPayloadBody);
}

getState(): ExecutionEngineState {
return this.state;
}

private updateEngineState(newState: ExecutionEngineState): void {
const oldState = this.state;

if (oldState === newState) return;

switch (newState) {
case ExecutionEngineState.SYNCED:
this.logger.info("ExecutionEngine online and synced");
break;
case ExecutionEngineState.SYNCING:
this.logger.info("ExecutionEngine is online and syncing");
break;
case ExecutionEngineState.OFFLINE:
this.logger.error("ExecutionEngine went offline");
break;
case ExecutionEngineState.AUTH_FAILED:
this.logger.error("ExecutionEngine authentication failed");
break;
}

this.state = newState;
}
}

type EngineRequestKey = keyof EngineApiRpcParamTypes;
Expand Down
9 changes: 9 additions & 0 deletions packages/beacon-node/src/execution/engine/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ export enum ExecutePayloadStatus {
UNSAFE_OPTIMISTIC_STATUS = "UNSAFE_OPTIMISTIC_STATUS",
}

export enum ExecutionEngineState {
SYNCING = "SYNCING",
SYNCED = "SYNCED",
OFFLINE = "OFFLINE",
AUTH_FAILED = "AUTH_FAILED",
}

export type ExecutePayloadResponse =
| {status: ExecutePayloadStatus.SYNCING | ExecutePayloadStatus.ACCEPTED; latestValidHash: null; validationError: null}
| {status: ExecutePayloadStatus.VALID; latestValidHash: RootHex; validationError: null}
Expand Down Expand Up @@ -132,4 +139,6 @@ export interface IExecutionEngine {
getPayloadBodiesByHash(blockHash: DATA[]): Promise<(ExecutionPayloadBody | null)[]>;

getPayloadBodiesByRange(start: number, count: number): Promise<(ExecutionPayloadBody | null)[]>;

getState(): ExecutionEngineState;
}
41 changes: 40 additions & 1 deletion packages/beacon-node/src/execution/engine/utils.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {IJson, RpcPayload} from "../../eth1/interface.js";
import {IJsonRpcHttpClient} from "../../eth1/provider/jsonRpcHttpClient.js";
import {IJsonRpcHttpClient, isFetchError} from "../../eth1/provider/jsonRpcHttpClient.js";
import {ExecutePayloadStatus, ExecutionEngineState} from "./interface.js";

export type JsonRpcBackend = {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
Expand Down Expand Up @@ -27,3 +28,41 @@ export class ExecutionEngineMockJsonRpcClient implements IJsonRpcHttpClient {
return Promise.all(rpcPayloadArr.map((payload) => this.fetch<R>(payload)));
}
}

const fatalErrorCodes = ["ECONNREFUSED", "ENOTFOUND", "EAI_AGAIN"];
const connectionErrorCodes = ["ECONNRESET", "ECONNABORTED"];

export function getExecutionEngineState({
payloadError,
payloadStatus,
}:
| {payloadStatus: ExecutePayloadStatus; payloadError?: never}
| {payloadStatus?: never; payloadError: unknown}): ExecutionEngineState {
switch (payloadStatus) {
case ExecutePayloadStatus.ACCEPTED:
case ExecutePayloadStatus.VALID:
case ExecutePayloadStatus.UNSAFE_OPTIMISTIC_STATUS:
return ExecutionEngineState.SYNCED;

case ExecutePayloadStatus.ELERROR:
case ExecutePayloadStatus.INVALID:
case ExecutePayloadStatus.SYNCING:
case ExecutePayloadStatus.INVALID_BLOCK_HASH:
return ExecutionEngineState.SYNCING;

case ExecutePayloadStatus.UNAVAILABLE:
return ExecutionEngineState.OFFLINE;
}

if (payloadError && isFetchError(payloadError) && fatalErrorCodes.includes(payloadError.code)) {
return ExecutionEngineState.OFFLINE;
}

if (payloadError && isFetchError(payloadError) && connectionErrorCodes.includes(payloadError.code)) {
return ExecutionEngineState.AUTH_FAILED;
}

// In case we can't determine the state, we assume it's syncing
// This assumption is better than considering offline, because the offline state may trigger some notifications
return ExecutionEngineState.SYNCING;
}
7 changes: 6 additions & 1 deletion packages/beacon-node/src/node/nodejs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ enum LoggerModule {
backfill = "backfill",
chain = "chain",
eth1 = "eth1",
executionEngine = "execution_engine",
nazarhussain marked this conversation as resolved.
Show resolved Hide resolved
metrics = "metrics",
monitoring = "monitoring",
network = "network",
Expand Down Expand Up @@ -209,7 +210,11 @@ export class BeaconNode {
logger: logger.child({module: LoggerModule.eth1}),
signal,
}),
executionEngine: initializeExecutionEngine(opts.executionEngine, {metrics, signal}),
executionEngine: initializeExecutionEngine(opts.executionEngine, {
metrics,
signal,
logger: logger.child({module: LoggerModule.executionEngine}),
}),
executionBuilder: opts.executionBuilder.enabled
? initializeExecutionBuilder(opts.executionBuilder, config, metrics)
: undefined,
Expand Down
6 changes: 6 additions & 0 deletions packages/beacon-node/src/sync/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {Metrics} from "../metrics/index.js";
import {IBeaconChain} from "../chain/index.js";
import {ClockEvent} from "../util/clock.js";
import {GENESIS_SLOT} from "../constants/constants.js";
import {ExecutionEngineState} from "../execution/index.js";
import {IBeaconSync, SyncModules, SyncingStatus} from "./interface.js";
import {RangeSync, RangeSyncStatus, RangeSyncEvent} from "./range/range.js";
import {getPeerSyncType, PeerSyncType, peerSyncTypes} from "./utils/remoteSyncType.js";
Expand Down Expand Up @@ -80,13 +81,16 @@ export class BeaconSync implements IBeaconSync {

getSyncStatus(): SyncingStatus {
const currentSlot = this.chain.clock.currentSlot;
const elOffline = this.chain.executionEngine.getState() === ExecutionEngineState.OFFLINE;

// If we are pre/at genesis, signal ready
if (currentSlot <= GENESIS_SLOT) {
return {
headSlot: "0",
syncDistance: "0",
isSyncing: false,
isOptimistic: false,
elOffline,
};
} else {
const head = this.chain.forkChoice.getHead();
Expand All @@ -100,13 +104,15 @@ export class BeaconSync implements IBeaconSync {
syncDistance: String(currentSlot - head.slot),
isSyncing: true,
isOptimistic: isOptimisticBlock(head),
elOffline,
};
case SyncState.Synced:
return {
headSlot: String(head.slot),
syncDistance: "0",
isSyncing: false,
isOptimistic: isOptimisticBlock(head),
elOffline,
};
default:
throw new Error("Node is stopped, cannot get sync status");
Expand Down
Loading
Loading