diff --git a/packages/node-core/src/blockchain.service.ts b/packages/node-core/src/blockchain.service.ts index f899e7629e..ad187b1eba 100644 --- a/packages/node-core/src/blockchain.service.ts +++ b/packages/node-core/src/blockchain.service.ts @@ -2,10 +2,23 @@ // SPDX-License-Identifier: GPL-3.0 import {BaseCustomDataSource, BaseDataSource, IProjectNetworkConfig} from '@subql/types-core'; -import {DatasourceParams, Header, IBlock, ISubqueryProject} from './indexer'; +import {DatasourceParams, Header, IBaseIndexerWorker, IBlock, ISubqueryProject} from './indexer'; // TODO probably need to split this in 2 to have a worker specific subset +export interface ICoreBlockchainService< + DS extends BaseDataSource = BaseDataSource, + SubQueryProject extends ISubqueryProject = ISubqueryProject +> { + /* The semver of the node */ + packageVersion: string; + + // Project service + onProjectChange(project: SubQueryProject): Promise | void; + /* Not all networks have a block timestamp, e.g. Shiden */ + getBlockTimestamp(height: number): Promise; +} + export interface IBlockchainService< DS extends BaseDataSource = BaseDataSource, CDS extends DS & BaseCustomDataSource = BaseCustomDataSource & DS, @@ -13,20 +26,20 @@ export interface IBlockchainService< SafeAPI = any, LightBlock = any, FullBlock = any, -> { - /* The semver of the node */ - packageVersion: string; - + Worker extends IBaseIndexerWorker = IBaseIndexerWorker +> extends ICoreBlockchainService { blockHandlerKind: string; // TODO SubqueryProject methods // Block dispatcher service fetchBlocks(blockNums: number[]): Promise[] | IBlock[]>; // TODO this probably needs to change to get light block type correct + /* This is the worker equivalent of fetchBlocks, it provides a context to allow syncing anything between workers */ + fetchBlockWorker(worker: Worker, blockNum: number, context: {workers: Worker[]}): Promise; // Project service - onProjectChange(project: SubQueryProject): Promise | void; - /* Not all networks have a block timestamp, e.g. Shiden */ - getBlockTimestamp(height: number): Promise; + // onProjectChange(project: SubQueryProject): Promise | void; + // /* Not all networks have a block timestamp, e.g. Shiden */ + // getBlockTimestamp(height: number): Promise; // Fetch service /** diff --git a/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts b/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts index 5ddb106d14..a6df6ab43d 100644 --- a/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts +++ b/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts @@ -27,6 +27,7 @@ export type ProcessBlockResponse = { }; export interface IBlockDispatcher { + init(onDynamicDsCreated: (height: number) => void): Promise; // now within enqueueBlock should handle getLatestBufferHeight enqueueBlocks(heights: (IBlock | number)[], latestBufferHeight: number): void | Promise; queueSize: number; diff --git a/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.spec.ts b/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.spec.ts index a1ce276196..749563b0ff 100644 --- a/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.spec.ts +++ b/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.spec.ts @@ -2,14 +2,19 @@ // SPDX-License-Identifier: GPL-3.0 import {EventEmitter2} from '@nestjs/event-emitter'; +import {IBlockchainService} from '../../blockchain.service'; import {IProjectUpgradeService, NodeConfig} from '../../configure'; +import {ConnectionPoolStateManager} from '../connectionPoolState.manager'; +import {DynamicDsService} from '../dynamic-ds.service'; +import {InMemoryCacheService} from '../inMemoryCache.service'; import {PoiSyncService} from '../poi'; import {StoreService} from '../store.service'; import {StoreCacheService} from '../storeCache'; import {IProjectService, ISubqueryProject} from '../types'; +import {UnfinalizedBlocksService} from '../unfinalizedBlocks.service'; import {WorkerBlockDispatcher} from './worker-block-dispatcher'; -class TestWorkerBlockDispatcher extends WorkerBlockDispatcher { +class TestWorkerBlockDispatcher extends WorkerBlockDispatcher { async fetchBlock(worker: any, height: number): Promise { return Promise.resolve(); } @@ -19,7 +24,7 @@ class TestWorkerBlockDispatcher extends WorkerBlockDispatcher { } } describe('WorkerBlockDispatcher', () => { - let dispatcher: WorkerBlockDispatcher; + let dispatcher: WorkerBlockDispatcher; // Mock workers const mockWorkers = [ @@ -36,9 +41,15 @@ describe('WorkerBlockDispatcher', () => { null as unknown as IProjectUpgradeService, null as unknown as StoreService, null as unknown as StoreCacheService, + null as unknown as InMemoryCacheService, null as unknown as PoiSyncService, + null as unknown as DynamicDsService, + null as unknown as UnfinalizedBlocksService, + null as unknown as ConnectionPoolStateManager, null as unknown as ISubqueryProject, - null as unknown as () => Promise + null as unknown as IBlockchainService, + '', + [] ); (dispatcher as any).workers = mockWorkers; }); diff --git a/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.ts b/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.ts index 00b1e86c71..d921f99a80 100644 --- a/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.ts +++ b/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.ts @@ -2,14 +2,27 @@ // SPDX-License-Identifier: GPL-3.0 import assert from 'assert'; -import {OnApplicationShutdown} from '@nestjs/common'; +import {Inject, Injectable, OnApplicationShutdown} from '@nestjs/common'; import {EventEmitter2} from '@nestjs/event-emitter'; import {Interval} from '@nestjs/schedule'; +import {BaseDataSource} from '@subql/types-core'; import {last} from 'lodash'; +import {IApiConnectionSpecific} from '../../api.service'; +import {IBlockchainService} from '../../blockchain.service'; import {NodeConfig} from '../../configure'; import {IProjectUpgradeService} from '../../configure/ProjectUpgrade.service'; import {IndexerEvent} from '../../events'; -import {IBlock, PoiSyncService} from '../../indexer'; +import { + ConnectionPoolStateManager, + createIndexerWorker, + DynamicDsService, + IBaseIndexerWorker, + IBlock, + InMemoryCacheService, + PoiSyncService, + TerminateableWorker, + UnfinalizedBlocksService, +} from '../../indexer'; import {getLogger} from '../../logger'; import {monitorWrite} from '../../process'; import {AutoQueue, isTaskFlushedError} from '../../utils'; @@ -22,15 +35,6 @@ import {BaseBlockDispatcher} from './base-block-dispatcher'; const logger = getLogger('WorkerBlockDispatcherService'); -type Worker = { - processBlock: (height: number) => Promise; - getStatus: () => Promise; - getMemoryLeft: () => Promise; - getBlocksLoaded: () => Promise; - waitForWorkerBatchSize: (heapSizeInBytes: number) => Promise; - terminate: () => Promise; -}; - function initAutoQueue( workers: number | undefined, batchSize: number, @@ -41,27 +45,39 @@ function initAutoQueue( return new AutoQueue(workers * batchSize * 2, 1, timeout, name); } -export abstract class WorkerBlockDispatcher - extends BaseBlockDispatcher, DS, B> +@Injectable() +export class WorkerBlockDispatcher< + DS extends BaseDataSource = BaseDataSource, + Worker extends IBaseIndexerWorker = IBaseIndexerWorker, + Block = any, + ApiConn extends IApiConnectionSpecific = IApiConnectionSpecific + > + extends BaseBlockDispatcher, DS, Block> implements OnApplicationShutdown { - protected workers: W[] = []; + protected workers: TerminateableWorker[] = []; private numWorkers: number; private isShutdown = false; private currentWorkerIndex = 0; - protected abstract fetchBlock(worker: W, height: number): Promise; + private createWorker: () => Promise>; constructor( nodeConfig: NodeConfig, eventEmitter: EventEmitter2, - projectService: IProjectService, - projectUpgradeService: IProjectUpgradeService, + @Inject('IProjectService') projectService: IProjectService, + @Inject('IProjectUpgradeService') projectUpgradeService: IProjectUpgradeService, storeService: StoreService, storeCacheService: StoreCacheService, + cacheService: InMemoryCacheService, poiSyncService: PoiSyncService, - project: ISubqueryProject, - private createIndexerWorker: () => Promise, + dynamicDsService: DynamicDsService, + unfinalizedBlocksService: UnfinalizedBlocksService, + connectionPoolState: ConnectionPoolStateManager, + @Inject('ISubqueryProject') project: ISubqueryProject, + @Inject('IBlockchainService') private blockchainService: IBlockchainService, + workerPath: string, + workerFns: Parameters>[1], monitorService?: MonitorServiceInterface ) { super( @@ -76,13 +92,27 @@ export abstract class WorkerBlockDispatcher poiSyncService, monitorService ); + + this.createWorker = () => + createIndexerWorker( + workerPath, + workerFns, + storeService.getStore(), + cacheService.getCache(), + dynamicDsService, + unfinalizedBlocksService, + connectionPoolState, + project.root, + projectService.startHeight, + monitorService + ); // initAutoQueue will assert that workers is set. unfortunately we cant do anything before the super call // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.numWorkers = nodeConfig.workers!; } async init(onDynamicDsCreated: (height: number) => void): Promise { - this.workers = await Promise.all(new Array(this.numWorkers).fill(0).map(() => this.createIndexerWorker())); + this.workers = await Promise.all(new Array(this.numWorkers).fill(0).map(() => this.createWorker())); return super.init(onDynamicDsCreated); } @@ -96,7 +126,8 @@ export abstract class WorkerBlockDispatcher await Promise.all(this.workers.map((w) => w.terminate())); } } - async enqueueBlocks(heights: (IBlock | number)[], latestBufferHeight?: number): Promise { + + async enqueueBlocks(heights: (IBlock | number)[], latestBufferHeight?: number): Promise { assert( heights.every((h) => typeof h === 'number'), 'Worker block dispatcher only supports enqueuing numbers, not blocks.' @@ -147,7 +178,7 @@ export abstract class WorkerBlockDispatcher await worker.waitForWorkerBatchSize(this.minimumHeapLimit); - const pendingBlock = this.fetchBlock(worker, height); + const pendingBlock = this.blockchainService.fetchBlockWorker(worker, height, {workers: this.workers}); const processBlock = async () => { try { diff --git a/packages/node-core/src/indexer/project.service.ts b/packages/node-core/src/indexer/project.service.ts index e605ab5dc8..5576579109 100644 --- a/packages/node-core/src/indexer/project.service.ts +++ b/packages/node-core/src/indexer/project.service.ts @@ -8,7 +8,7 @@ import {EventEmitter2} from '@nestjs/event-emitter'; import {BaseDataSource, IProjectNetworkConfig} from '@subql/types-core'; import {Sequelize} from '@subql/x-sequelize'; import {IApi} from '../api.service'; -import {IBlockchainService} from '../blockchain.service'; +import {IBlockchainService, ICoreBlockchainService} from '../blockchain.service'; import {IProjectUpgradeService, NodeConfig} from '../configure'; import {IndexerEvent} from '../events'; import {getLogger} from '../logger'; @@ -35,7 +35,7 @@ class NotInitError extends Error { export class ProjectService< DS extends BaseDataSource = BaseDataSource, API extends IApi = IApi, - UnfinalizedBlocksService extends IUnfinalizedBlocksService = IUnfinalizedBlocksService, + UnfinalizedBlocksService extends IUnfinalizedBlocksService = IUnfinalizedBlocksService > implements IProjectService { private _schema?: string; @@ -55,7 +55,7 @@ export class ProjectService< private readonly dynamicDsService: DynamicDsService, private eventEmitter: EventEmitter2, @Inject('IUnfinalizedBlocksService') private readonly unfinalizedBlockService: UnfinalizedBlocksService, - @Inject('IBlockchainService') private blockchainService: IBlockchainService + @Inject('IBlockchainService') private blockchainService: ICoreBlockchainService ) { if (this.nodeConfig.unfinalizedBlocks && this.nodeConfig.allowSchemaMigration) { throw new Error('Unfinalized Blocks and Schema Migration cannot be enabled at the same time'); diff --git a/packages/node-core/src/indexer/worker/worker.ts b/packages/node-core/src/indexer/worker/worker.ts index 7384d8f65c..93ddea8e42 100644 --- a/packages/node-core/src/indexer/worker/worker.ts +++ b/packages/node-core/src/indexer/worker/worker.ts @@ -172,6 +172,8 @@ export function createWorkerHost< ); } +export type TerminateableWorker = T & {terminate: () => Promise}; + export async function createIndexerWorker< T extends IBaseIndexerWorker, ApiConnection extends IApiConnectionSpecific /*ApiPromiseConnection*/ /*ApiPromiseConnection*/, @@ -189,7 +191,7 @@ export async function createIndexerWorker< startHeight: number, monitorService?: MonitorServiceInterface, workerData?: any -): Promise Promise}> { +): Promise> { const indexerWorker = Worker.create< T & {initWorker: (startHeight: number) => Promise}, DefaultWorkerFunctions diff --git a/packages/node/src/blockchain.service.ts b/packages/node/src/blockchain.service.ts index 8d1788e29d..d0a2281b7a 100644 --- a/packages/node/src/blockchain.service.ts +++ b/packages/node/src/blockchain.service.ts @@ -7,6 +7,7 @@ import { isCustomDs, isRuntimeDs } from '@subql/common-substrate'; import { DatasourceParams, Header, + IBaseIndexerWorker, IBlock, IBlockchainService, mainThreadOnly, @@ -30,6 +31,7 @@ import { isFullBlock, LightBlockContent, } from './indexer/types'; +import { IIndexerWorker } from './indexer/worker/worker'; import { calcInterval, getBlockByHeight, @@ -55,13 +57,13 @@ export class BlockchainService SubqueryProject, ApiAt, LightBlockContent, - BlockContent + BlockContent, + IIndexerWorker > { constructor( @Inject('APIService') private apiService: ApiService, - @Inject(isMainThread ? RuntimeService : 'Null') - private runtimeService: RuntimeService, + @Inject('RuntimeService') private runtimeService: RuntimeService, ) {} isCustomDs = isCustomDs; @@ -69,6 +71,7 @@ export class BlockchainService blockHandlerKind = SubstrateHandlerKind.Block; packageVersion = packageVersion; + @mainThreadOnly() async fetchBlocks( blockNums: number[], ): Promise[] | IBlock[]> { @@ -84,6 +87,29 @@ export class BlockchainService ); } + async fetchBlockWorker( + worker: IIndexerWorker, + height: number, + context: { workers: IIndexerWorker[] }, + ): Promise { + // get SpecVersion from main runtime service + const { blockSpecVersion, syncedDictionary } = + await this.runtimeService.getSpecVersion(height); + + // if main runtime specVersion has been updated, then sync with all workers specVersion map, and lastFinalizedBlock + if (syncedDictionary) { + context.workers.map((w) => + w.syncRuntimeService( + this.runtimeService.specVersionMap, + this.runtimeService.latestFinalizedHeight, + ), + ); + } + + // const start = new Date(); + await worker.fetchBlock(height, blockSpecVersion); + } + async onProjectChange(project: SubqueryProject): Promise { // Only network with chainTypes require to reload await this.apiService.updateChainTypes(); @@ -98,8 +124,9 @@ export class BlockchainService async getFinalizedHeader(): Promise
{ const finalizedHash = await this.apiService.unsafeApi.rpc.chain.getFinalizedHead(); - const finalizedHeader = - await this.apiService.unsafeApi.rpc.chain.getHeader(finalizedHash); + const finalizedHeader = await this.apiService.unsafeApi.rpc.chain.getHeader( + finalizedHash, + ); return substrateHeaderToHeader(finalizedHeader); } diff --git a/packages/node/src/indexer/blockDispatcher/index.ts b/packages/node/src/indexer/blockDispatcher/index.ts deleted file mode 100644 index 0081a692eb..0000000000 --- a/packages/node/src/indexer/blockDispatcher/index.ts +++ /dev/null @@ -1,7 +0,0 @@ -// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors -// SPDX-License-Identifier: GPL-3.0 - -// import { BlockDispatcherService } from './block-dispatcher.service'; -import { WorkerBlockDispatcherService } from './worker-block-dispatcher.service'; - -export { WorkerBlockDispatcherService }; diff --git a/packages/node/src/indexer/blockDispatcher/substrate-block-dispatcher.ts b/packages/node/src/indexer/blockDispatcher/substrate-block-dispatcher.ts deleted file mode 100644 index a81b4e6b03..0000000000 --- a/packages/node/src/indexer/blockDispatcher/substrate-block-dispatcher.ts +++ /dev/null @@ -1,14 +0,0 @@ -// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors -// SPDX-License-Identifier: GPL-3.0 - -import { IBlockDispatcher } from '@subql/node-core'; -import { SubstrateBlock } from '@subql/types'; -import { RuntimeService } from '../runtime/runtimeService'; - -export interface ISubstrateBlockDispatcher - extends IBlockDispatcher { - init( - onDynamicDsCreated: (height: number) => void, - runtimeService?: RuntimeService, - ): Promise; -} diff --git a/packages/node/src/indexer/blockDispatcher/worker-block-dispatcher.service.ts b/packages/node/src/indexer/blockDispatcher/worker-block-dispatcher.service.ts deleted file mode 100644 index 98ece59a80..0000000000 --- a/packages/node/src/indexer/blockDispatcher/worker-block-dispatcher.service.ts +++ /dev/null @@ -1,146 +0,0 @@ -// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors -// SPDX-License-Identifier: GPL-3.0 - -import assert from 'assert'; -import path from 'path'; -import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; -import { EventEmitter2 } from '@nestjs/event-emitter'; -import { - NodeConfig, - StoreService, - StoreCacheService, - IProjectService, - WorkerBlockDispatcher, - ConnectionPoolStateManager, - IProjectUpgradeService, - PoiSyncService, - InMemoryCacheService, - createIndexerWorker as createIndexerWorkerCore, - MonitorServiceInterface, - UnfinalizedBlocksService, - DynamicDsService, -} from '@subql/node-core'; -import { SubstrateDatasource } from '@subql/types'; -import { SubqueryProject } from '../../configure/SubqueryProject'; -import { ApiPromiseConnection } from '../apiPromise.connection'; -import { RuntimeService } from '../runtime/runtimeService'; -import { BlockContent, LightBlockContent } from '../types'; -import { IIndexerWorker } from '../worker/worker'; - -type IndexerWorker = IIndexerWorker & { - terminate: () => Promise; -}; - -@Injectable() -export class WorkerBlockDispatcherService - extends WorkerBlockDispatcher< - SubstrateDatasource, - IndexerWorker, - BlockContent | LightBlockContent - > - implements OnApplicationShutdown -{ - private _runtimeService?: RuntimeService; - - constructor( - nodeConfig: NodeConfig, - eventEmitter: EventEmitter2, - @Inject('IProjectService') - projectService: IProjectService, - @Inject('IProjectUpgradeService') - projectUpgadeService: IProjectUpgradeService, - cacheService: InMemoryCacheService, - storeService: StoreService, - storeCacheService: StoreCacheService, - poiSyncService: PoiSyncService, - @Inject('ISubqueryProject') project: SubqueryProject, - dynamicDsService: DynamicDsService, - unfinalizedBlocksService: UnfinalizedBlocksService< - BlockContent | LightBlockContent - >, - connectionPoolState: ConnectionPoolStateManager, - monitorService?: MonitorServiceInterface, - ) { - super( - nodeConfig, - eventEmitter, - projectService, - projectUpgadeService, - storeService, - storeCacheService, - poiSyncService, - project, - () => - createIndexerWorkerCore< - IIndexerWorker, - ApiPromiseConnection, - BlockContent, - SubstrateDatasource - >( - path.resolve(__dirname, '../../../dist/indexer/worker/worker.js'), - ['syncRuntimeService', 'getSpecFromMap'], - storeService.getStore(), - cacheService.getCache(), - dynamicDsService, - unfinalizedBlocksService, - connectionPoolState, - project.root, - projectService.startHeight, - monitorService, - ), - monitorService, - ); - } - - private get runtimeService(): RuntimeService { - assert(this._runtimeService, 'RuntimeService not initialized'); - return this._runtimeService; - } - - async init( - onDynamicDsCreated: (height: number) => void, - runtimeService?: RuntimeService, - ): Promise { - await super.init(onDynamicDsCreated); - // Sync workers runtime from main - if (runtimeService) this._runtimeService = runtimeService; - this.syncWorkerRuntimes(); - } - - syncWorkerRuntimes(): void { - this.workers.map((w) => - w.syncRuntimeService( - this.runtimeService.specVersionMap, - this.runtimeService.latestFinalizedHeight, - ), - ); - } - - protected async fetchBlock( - worker: IndexerWorker, - height: number, - ): Promise { - // get SpecVersion from main runtime service - const { blockSpecVersion, syncedDictionary } = - await this.runtimeService.getSpecVersion(height); - // if main runtime specVersion has been updated, then sync with all workers specVersion map, and lastFinalizedBlock - if (syncedDictionary) { - this.syncWorkerRuntimes(); - } - - // const start = new Date(); - await worker.fetchBlock(height, blockSpecVersion); - // const end = new Date(); - - // const waitTime = end.getTime() - start.getTime(); - // if (waitTime > 1000) { - // logger.info( - // `Waiting to fetch block ${height}: ${chalk.red(`${waitTime}ms`)}`, - // ); - // } else if (waitTime > 200) { - // logger.info( - // `Waiting to fetch block ${height}: ${chalk.yellow(`${waitTime}ms`)}`, - // ); - // } - } -} diff --git a/packages/node/src/indexer/fetch.module.ts b/packages/node/src/indexer/fetch.module.ts index 1eac85f17b..59b7515611 100644 --- a/packages/node/src/indexer/fetch.module.ts +++ b/packages/node/src/indexer/fetch.module.ts @@ -1,6 +1,7 @@ // Copyright 2020-2024 SubQuery Pte Ltd authors & contributors // SPDX-License-Identifier: GPL-3.0 +import path from 'path'; import { Module } from '@nestjs/common'; import { EventEmitter2 } from '@nestjs/event-emitter'; import { @@ -19,17 +20,19 @@ import { DsProcessorService, ProjectService, DynamicDsService, + WorkerBlockDispatcher, } from '@subql/node-core'; import { SubstrateDatasource } from '@subql/types'; import { BlockchainService } from '../blockchain.service'; import { SubqueryProject } from '../configure/SubqueryProject'; import { ApiService } from './api.service'; import { ApiPromiseConnection } from './apiPromise.connection'; -import { WorkerBlockDispatcherService } from './blockDispatcher'; import { SubstrateDictionaryService } from './dictionary/substrateDictionary.service'; import { FetchService } from './fetch.service'; import { IndexerManager } from './indexer.manager'; import { RuntimeService } from './runtime/runtimeService'; +import { BlockContent, LightBlockContent } from './types'; +import { IIndexerWorker } from './worker/worker'; @Module({ imports: [CoreModule], @@ -45,7 +48,7 @@ import { RuntimeService } from './runtime/runtimeService'; ], }, { - provide: RuntimeService, // TODO DOING this because of circular reference with dictionary service + provide: 'RuntimeService', // TODO DOING this because of circular reference with dictionary service useFactory: (apiService: ApiService) => new RuntimeService(apiService), inject: ['APIService'], }, @@ -86,19 +89,27 @@ import { RuntimeService } from './runtime/runtimeService'; monitorService?: MonitorService, ) => { return nodeConfig.workers - ? new WorkerBlockDispatcherService( + ? new WorkerBlockDispatcher< + SubstrateDatasource, + IIndexerWorker, + BlockContent | LightBlockContent, + ApiPromiseConnection + >( nodeConfig, eventEmitter, projectService, projectUpgradeService, - cacheService, storeService, storeCacheService, + cacheService, poiSyncService, - project, dynamicDsService, unfinalizedBlocks, connectionPoolState, + project, + blockchainService, + path.resolve(__dirname, '../../dist/indexer/worker/worker.js'), + ['syncRuntimeService', 'getSpecFromMap'], monitorService, ) : new BlockDispatcher( diff --git a/packages/node/src/indexer/fetch.service.ts b/packages/node/src/indexer/fetch.service.ts index e5f69ceaf1..93cd0969ee 100644 --- a/packages/node/src/indexer/fetch.service.ts +++ b/packages/node/src/indexer/fetch.service.ts @@ -10,18 +10,18 @@ import { StoreCacheService, UnfinalizedBlocksService, ProjectService, + IBlockDispatcher, } from '@subql/node-core'; import { SubstrateDatasource, SubstrateBlock } from '@subql/types'; import { BlockchainService } from '../blockchain.service'; import { SubqueryProject } from '../configure/SubqueryProject'; -import { ISubstrateBlockDispatcher } from './blockDispatcher/substrate-block-dispatcher'; import { SubstrateDictionaryService } from './dictionary/substrateDictionary.service'; import { RuntimeService } from './runtime/runtimeService'; @Injectable() export class FetchService extends BaseFetchService< SubstrateDatasource, - ISubstrateBlockDispatcher, + IBlockDispatcher, SubstrateBlock > { constructor( @@ -30,13 +30,13 @@ export class FetchService extends BaseFetchService< projectService: ProjectService, @Inject('ISubqueryProject') project: SubqueryProject, @Inject('IBlockDispatcher') - blockDispatcher: ISubstrateBlockDispatcher, + blockDispatcher: IBlockDispatcher, dictionaryService: SubstrateDictionaryService, @Inject('IUnfinalizedBlocksService') unfinalizedBlocksService: UnfinalizedBlocksService, eventEmitter: EventEmitter2, schedulerRegistry: SchedulerRegistry, - private runtimeService: RuntimeService, + @Inject('RuntimeService') private runtimeService: RuntimeService, storeCacheService: StoreCacheService, @Inject('IBlockchainService') blockchainService: BlockchainService, ) { @@ -55,10 +55,7 @@ export class FetchService extends BaseFetchService< } protected async initBlockDispatcher(): Promise { - await this.blockDispatcher.init( - this.resetForNewDs.bind(this), - this.runtimeService, - ); + await this.blockDispatcher.init(this.resetForNewDs.bind(this)); } protected async preLoopHook({ diff --git a/packages/node/src/indexer/worker/worker-fetch.module.ts b/packages/node/src/indexer/worker/worker-fetch.module.ts index 39ce924fd0..12be91c47b 100644 --- a/packages/node/src/indexer/worker/worker-fetch.module.ts +++ b/packages/node/src/indexer/worker/worker-fetch.module.ts @@ -39,7 +39,12 @@ import { WorkerService } from './worker.service'; provide: 'IProjectService', useClass: ProjectService, }, - WorkerRuntimeService, + // This is alised so it satisfies the BlockchainService, other services are updated to reflect this + // TODO find a way to remove the alias, currently theres no common interface between worker and non-worker + { + provide: 'RuntimeService', + useClass: WorkerRuntimeService, + }, { provide: 'IBlockchainService', useClass: BlockchainService, diff --git a/packages/node/src/indexer/worker/worker.service.ts b/packages/node/src/indexer/worker/worker.service.ts index 00fe2318ff..57b6ffe3dc 100644 --- a/packages/node/src/indexer/worker/worker.service.ts +++ b/packages/node/src/indexer/worker/worker.service.ts @@ -29,6 +29,7 @@ export class WorkerService extends BaseWorkerService< constructor( @Inject('APIService') private apiService: ApiService, private indexerManager: IndexerManager, + @Inject('RuntimeService') private workerRuntimeService: WorkerRuntimeService, @Inject('IProjectService') projectService: IProjectService,