Skip to content

Commit

Permalink
Make batch sizes dynamic for eth1 fetch of blocks/logs (#4532)
Browse files Browse the repository at this point in the history
* Fix getBlocksByNumber batch concurrency

* Make the blocks and logs fetching dynamic

* add to eth1 dashboard

* add tests to validate dynamic mechanism
  • Loading branch information
g11tech authored Sep 13, 2022
1 parent 15db5c7 commit 0886d75
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 64 deletions.
18 changes: 17 additions & 1 deletion dashboards/lodestar_execution_engine.json
Original file line number Diff line number Diff line change
Expand Up @@ -1001,9 +1001,25 @@
"interval": "",
"legendFormat": "eth1_follow_distance_dynamic",
"refId": "A"
},
{
"exemplar": false,
"expr": "lodestar_eth1_blocks_batch_size_dynamic",
"hide": false,
"interval": "",
"legendFormat": "eth1_blocks_batch_size_dynamic",
"refId": "B"
},
{
"exemplar": false,
"expr": "lodestar_eth1_logs_batch_size_dynamic",
"hide": false,
"interval": "",
"legendFormat": "eth1_logs_batch_size_dynamic",
"refId": "C"
}
],
"title": "Eth1 Follow Distance Dynamic",
"title": "Eth1 Dynamic Stats",
"type": "timeseries"
},
{
Expand Down
67 changes: 54 additions & 13 deletions packages/beacon-node/src/eth1/eth1DepositDataTracker.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import {phase0, ssz} from "@lodestar/types";
import {IChainForkConfig} from "@lodestar/config";
import {BeaconStateAllForks, becomesNewEth1Data} from "@lodestar/state-transition";
import {ErrorAborted, fromHex, ILogger, isErrorAborted, sleep} from "@lodestar/utils";
import {ErrorAborted, TimeoutError, fromHex, ILogger, isErrorAborted, sleep} from "@lodestar/utils";

import {IBeaconDb} from "../db/index.js";
import {IMetrics} from "../metrics/index.js";
import {Eth1DepositsCache} from "./eth1DepositsCache.js";
Expand All @@ -12,9 +13,14 @@ import {Eth1DataAndDeposits, IEth1Provider} from "./interface.js";
import {Eth1Options} from "./options.js";
import {HttpRpcError} from "./provider/jsonRpcHttpClient.js";
import {parseEth1Block} from "./provider/eth1Provider.js";
import {isJsonRpcTruncatedError} from "./provider/utils.js";

const MAX_BLOCKS_PER_BLOCK_QUERY = 1000;
const MIN_BLOCKS_PER_BLOCK_QUERY = 10;

const MAX_BLOCKS_PER_LOG_QUERY = 1000;
const MIN_BLOCKS_PER_LOG_QUERY = 10;

/** Eth1 blocks happen every 14s approx, not need to update too often once synced */
const AUTO_UPDATE_PERIOD_MS = 60 * 1000;
/** Prevent infinite loops */
Expand Down Expand Up @@ -53,7 +59,13 @@ export class Eth1DepositDataTracker {
private depositsCache: Eth1DepositsCache;
private eth1DataCache: Eth1DataCache;
private lastProcessedDepositBlockNumber: number | null = null;

/** Dynamically adjusted follow distance */
private eth1FollowDistance: number;
/** Dynamically adusted batch size to fetch deposit logs */
private eth1GetBlocksBatchSizeDynamic = MAX_BLOCKS_PER_BLOCK_QUERY;
/** Dynamically adusted batch size to fetch deposit logs */
private eth1GetLogsBatchSizeDynamic = MAX_BLOCKS_PER_LOG_QUERY;
private readonly forcedEth1DataVote: phase0.Eth1Data | null;

constructor(
Expand Down Expand Up @@ -81,16 +93,20 @@ export class Eth1DepositDataTracker {
if (metrics) {
// Set constant value once
metrics?.eth1.eth1FollowDistanceSecondsConfig.set(config.SECONDS_PER_ETH1_BLOCK * config.ETH1_FOLLOW_DISTANCE);
metrics.eth1.eth1FollowDistanceDynamic.addCollect(() =>
metrics.eth1.eth1FollowDistanceDynamic.set(this.eth1FollowDistance)
);
metrics.eth1.eth1FollowDistanceDynamic.addCollect(() => {
metrics.eth1.eth1FollowDistanceDynamic.set(this.eth1FollowDistance);
metrics.eth1.eth1GetBlocksBatchSizeDynamic.set(this.eth1GetBlocksBatchSizeDynamic);
metrics.eth1.eth1GetLogsBatchSizeDynamic.set(this.eth1GetLogsBatchSizeDynamic);
});
}

this.runAutoUpdate().catch((e: Error) => {
if (!(e instanceof ErrorAborted)) {
this.logger.error("Error on eth1 loop", {}, e);
}
});
if (opts.enabled) {
this.runAutoUpdate().catch((e: Error) => {
if (!(e instanceof ErrorAborted)) {
this.logger.error("Error on eth1 loop", {}, e);
}
});
}
}

/**
Expand Down Expand Up @@ -202,9 +218,22 @@ export class Eth1DepositDataTracker {
// The DB may contain deposits from a different chain making lastProcessedDepositBlockNumber > current chain tip
// The Math.min() fixes those rare scenarios where fromBlock > toBlock
const fromBlock = Math.min(remoteFollowBlock, this.getFromBlockToFetch(lastProcessedDepositBlockNumber));
const toBlock = Math.min(remoteFollowBlock, fromBlock + MAX_BLOCKS_PER_LOG_QUERY - 1);
const toBlock = Math.min(remoteFollowBlock, fromBlock + this.eth1GetLogsBatchSizeDynamic - 1);

let depositEvents;
try {
depositEvents = await this.eth1Provider.getDepositEvents(fromBlock, toBlock);
this.eth1GetLogsBatchSizeDynamic = Math.min(MAX_BLOCKS_PER_LOG_QUERY, this.eth1GetLogsBatchSizeDynamic * 2);
} catch (e) {
if (isJsonRpcTruncatedError(e as Error) || e instanceof TimeoutError) {
this.eth1GetLogsBatchSizeDynamic = Math.max(
MIN_BLOCKS_PER_LOG_QUERY,
Math.floor(this.eth1GetLogsBatchSizeDynamic / 2)
);
}
throw e;
}

const depositEvents = await this.eth1Provider.getDepositEvents(fromBlock, toBlock);
this.logger.verbose("Fetched deposits", {depositCount: depositEvents.length, fromBlock, toBlock});
this.metrics?.eth1.depositEventsFetched.inc(depositEvents.length);

Expand Down Expand Up @@ -253,11 +282,23 @@ export class Eth1DepositDataTracker {
);
const toBlock = Math.min(
remoteFollowBlock,
fromBlock + MAX_BLOCKS_PER_BLOCK_QUERY - 1, // Block range is inclusive
fromBlock + this.eth1GetBlocksBatchSizeDynamic - 1, // Block range is inclusive
lastProcessedDepositBlockNumber
);

const blocksRaw = await this.eth1Provider.getBlocksByNumber(fromBlock, toBlock);
let blocksRaw;
try {
blocksRaw = await this.eth1Provider.getBlocksByNumber(fromBlock, toBlock);
this.eth1GetBlocksBatchSizeDynamic = Math.min(MAX_BLOCKS_PER_BLOCK_QUERY, this.eth1GetBlocksBatchSizeDynamic * 2);
} catch (e) {
if (isJsonRpcTruncatedError(e as Error) || e instanceof TimeoutError) {
this.eth1GetBlocksBatchSizeDynamic = Math.max(
MIN_BLOCKS_PER_BLOCK_QUERY,
Math.floor(this.eth1GetBlocksBatchSizeDynamic / 2)
);
}
throw e;
}
const blocks = blocksRaw.map(parseEth1Block);

this.logger.verbose("Fetched eth1 blocks", {blockCount: blocks.length, fromBlock, toBlock});
Expand Down
60 changes: 10 additions & 50 deletions packages/beacon-node/src/eth1/provider/eth1Provider.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import {toHexString} from "@chainsafe/ssz";
import {phase0} from "@lodestar/types";
import {IChainConfig} from "@lodestar/config";
import {fromHex, retry} from "@lodestar/utils";
import {fromHex} from "@lodestar/utils";

import {chunkifyInclusiveRange} from "../../util/chunkify.js";
import {linspace} from "../../util/numpy.js";
import {depositEventTopics, parseDepositLog} from "../utils/depositContract.js";
import {Eth1Block, IEth1Provider} from "../interface.js";
Expand Down Expand Up @@ -77,32 +76,12 @@ export class Eth1Provider implements IEth1Provider {
}

async getDepositEvents(fromBlock: number, toBlock: number): Promise<phase0.DepositEvent[]> {
const logsRawArr = await retry(
(attempt) => {
// Large log requests can return with code 200 but truncated, with broken JSON
// This retry will split a given block range into smaller ranges exponentially
// The underlying http client should handle network errors and retry
const chunkCount = 2 ** (attempt - 1);
const blockRanges = chunkifyInclusiveRange(fromBlock, toBlock, chunkCount);
return Promise.all(
blockRanges.map(([from, to]) => {
const options = {
fromBlock: from,
toBlock: to,
address: this.depositContractAddress,
topics: depositEventTopics,
};
return this.getLogs(options);
})
);
},
{
retries: 3,
retryDelay: 3000,
shouldRetry: isJsonRpcTruncatedError,
}
);

const logsRawArr = await this.getLogs({
fromBlock,
toBlock,
address: this.depositContractAddress,
topics: depositEventTopics,
});
return logsRawArr.flat(1).map((log) => parseDepositLog(log));
}

Expand All @@ -111,29 +90,10 @@ export class Eth1Provider implements IEth1Provider {
*/
async getBlocksByNumber(fromBlock: number, toBlock: number): Promise<EthJsonRpcBlockRaw[]> {
const method = "eth_getBlockByNumber";
const blocksArr = await retry(
(attempt) => {
// Large batch requests can return with code 200 but truncated, with broken JSON
// This retry will split a given block range into smaller ranges exponentially
// The underlying http client should handle network errors and retry
const chunkCount = 2 ** (attempt - 1);
const blockRanges = chunkifyInclusiveRange(fromBlock, toBlock, chunkCount);
return Promise.all(
blockRanges.map(([from, to]) =>
this.rpc.fetchBatch<IEthJsonRpcReturnTypes[typeof method]>(
linspace(from, to).map((blockNumber) => ({method, params: [numToQuantity(blockNumber), false]})),
getBlocksByNumberOpts
)
)
);
},
{
retries: 3,
retryDelay: 3000,
shouldRetry: isJsonRpcTruncatedError,
}
const blocksArr = await this.rpc.fetchBatch<IEthJsonRpcReturnTypes[typeof method]>(
linspace(fromBlock, toBlock).map((blockNumber) => ({method, params: [numToQuantity(blockNumber), false]})),
getBlocksByNumberOpts
);

const blocks: EthJsonRpcBlockRaw[] = [];
for (const block of blocksArr.flat(1)) {
if (block) blocks.push(block);
Expand Down
8 changes: 8 additions & 0 deletions packages/beacon-node/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1078,6 +1078,14 @@ export function createLodestarMetrics(
name: "lodestar_eth1_follow_distance_dynamic",
help: "Eth1 dynamic follow distance changed by the deposit tracker if blocks are slow",
}),
eth1GetBlocksBatchSizeDynamic: register.gauge({
name: "lodestar_eth1_blocks_batch_size_dynamic",
help: "Dynamic batch size to fetch blocks",
}),
eth1GetLogsBatchSizeDynamic: register.gauge({
name: "lodestar_eth1_logs_batch_size_dynamic",
help: "Dynamic batch size to fetch deposit logs",
}),

// Merge Search info
eth1MergeStatus: register.gauge({
Expand Down
93 changes: 93 additions & 0 deletions packages/beacon-node/test/unit/eth1/eth1DepositDataTracker.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import {expect} from "chai";
import sinon from "sinon";
import {config} from "@lodestar/config/default";
import {TimeoutError} from "@lodestar/utils";

import {Eth1DepositDataTracker} from "../../../src/eth1/eth1DepositDataTracker.js";
import {Eth1Provider} from "../../../src/eth1/provider/eth1Provider.js";
import {testLogger} from "../../utils/logger.js";
import {defaultEth1Options} from "../../../src/eth1/options.js";
import {BeaconDb} from "../../../src/db/beacon.js";

describe("Eth1DepositDataTracker", function () {
const sandbox = sinon.createSandbox();
const controller = new AbortController();

const logger = testLogger();
const opts = {...defaultEth1Options, enabled: false};
const signal = controller.signal;
const eth1Provider = new Eth1Provider(config, opts, signal, null);
const db = sinon.createStubInstance(BeaconDb);

const eth1DepositDataTracker = new Eth1DepositDataTracker(
opts,
{config, db, logger, signal, metrics: null},
eth1Provider
);
sinon
.stub(
(eth1DepositDataTracker as never) as {
getLastProcessedDepositBlockNumber: typeof eth1DepositDataTracker["getLastProcessedDepositBlockNumber"];
},
"getLastProcessedDepositBlockNumber"
)
.resolves(0);

sinon.stub(eth1DepositDataTracker["eth1DataCache"], "getHighestCachedBlockNumber").resolves(0);
sinon.stub(eth1DepositDataTracker["eth1DataCache"], "add").resolves(void 0);

sinon.stub(eth1DepositDataTracker["depositsCache"], "getEth1DataForBlocks").resolves([]);
sinon.stub(eth1DepositDataTracker["depositsCache"], "add").resolves(void 0);
sinon.stub(eth1DepositDataTracker["depositsCache"], "getLowestDepositEventBlockNumber").resolves(0);

const getBlocksByNumberStub = sinon.stub(eth1Provider, "getBlocksByNumber");
const getDepositEventsStub = sinon.stub(eth1Provider, "getDepositEvents");

after(() => {
sandbox.restore();
});

it("Should dynamically adjust blocks batch size", async function () {
let expectedSize = 1000;
expect(eth1DepositDataTracker["eth1GetBlocksBatchSizeDynamic"]).to.be.equal(expectedSize);

// If there are timeerrors or parse errors then batch size should reduce
getBlocksByNumberStub.throws(new TimeoutError("timeout error"));
for (let i = 0; i < 10; i++) {
expectedSize = Math.max(Math.floor(expectedSize / 2), 10);
await eth1DepositDataTracker["updateBlockCache"](3000).catch((_e) => void 0);
expect(eth1DepositDataTracker["eth1GetBlocksBatchSizeDynamic"]).to.be.equal(expectedSize);
}
expect(expectedSize).to.be.equal(10);

getBlocksByNumberStub.resolves([]);
for (let i = 0; i < 10; i++) {
expectedSize = Math.min(expectedSize * 2, 1000);
await eth1DepositDataTracker["updateBlockCache"](3000);
expect(eth1DepositDataTracker["eth1GetBlocksBatchSizeDynamic"]).to.be.equal(expectedSize);
}
expect(expectedSize).to.be.equal(1000);
});

it("Should dynamically adjust logs batch size", async function () {
let expectedSize = 1000;
expect(eth1DepositDataTracker["eth1GetLogsBatchSizeDynamic"]).to.be.equal(expectedSize);

// If there are timeerrors or parse errors then batch size should reduce
getDepositEventsStub.throws(new TimeoutError("timeout error"));
for (let i = 0; i < 10; i++) {
expectedSize = Math.max(Math.floor(expectedSize / 2), 10);
await eth1DepositDataTracker["updateDepositCache"](3000).catch((_e) => void 0);
expect(eth1DepositDataTracker["eth1GetLogsBatchSizeDynamic"]).to.be.equal(expectedSize);
}
expect(expectedSize).to.be.equal(10);

getDepositEventsStub.resolves([]);
for (let i = 0; i < 10; i++) {
expectedSize = Math.min(expectedSize * 2, 1000);
await eth1DepositDataTracker["updateDepositCache"](3000);
expect(eth1DepositDataTracker["eth1GetLogsBatchSizeDynamic"]).to.be.equal(expectedSize);
}
expect(expectedSize).to.be.equal(1000);
});
});

0 comments on commit 0886d75

Please sign in to comment.