diff --git a/packages/node/src/configure/SubqueryProject.ts b/packages/node/src/configure/SubqueryProject.ts index aafd529ff..e0372f617 100644 --- a/packages/node/src/configure/SubqueryProject.ts +++ b/packages/node/src/configure/SubqueryProject.ts @@ -2,8 +2,11 @@ // SPDX-License-Identifier: GPL-3.0 import assert from 'assert'; +import fs from 'fs'; +import os from 'os'; +import path from 'path'; import { Injectable } from '@nestjs/common'; -import { LocalReader, makeTempDir, validateSemver } from '@subql/common'; +import { LocalReader, validateSemver } from '@subql/common'; import { CosmosProjectNetworkConfig, parseCosmosProjectManifest, @@ -101,15 +104,12 @@ export class SubqueryProject implements ISubqueryProject { NOT_SUPPORT('<1.0.0'); } - const fileCacheDir = await getFileCacheDir(reader, root); - return loadProjectFromManifestBase( manifest.asV1_0_0, reader, path, root, networkOverrides, - fileCacheDir, ); } } @@ -117,10 +117,20 @@ export class SubqueryProject implements ISubqueryProject { async function getFileCacheDir( reader: Reader, projectRoot: string, + chainId: string, ): Promise { if (isTmpDir(projectRoot)) return projectRoot; - if (reader instanceof LocalReader) return makeTempDir(); - + if (reader instanceof LocalReader) { + const tmpDir = path.join(os.tmpdir(), `kyveTmpFileCache_${chainId}`); + try { + await fs.promises.mkdir(tmpDir); + } catch (e) { + if (e.code === 'EEXIST') { + return tmpDir; + } + } + return tmpDir; + } return projectRoot; } @@ -132,7 +142,6 @@ async function loadProjectFromManifestBase( path: string, root: string, networkOverrides?: Partial, - fileCacheDir?: string, ): Promise { if (typeof projectManifest.network.endpoint === 'string') { projectManifest.network.endpoint = [projectManifest.network.endpoint]; @@ -183,6 +192,8 @@ async function loadProjectFromManifestBase( ), ); + const fileCacheDir = await getFileCacheDir(reader, root, network.chainId); + return new SubqueryProject( reader.root ? reader.root : path, //TODO, need to method to get project_id root, diff --git a/packages/node/src/indexer/api.service.ts b/packages/node/src/indexer/api.service.ts index 16d393e4a..bc705219e 100644 --- a/packages/node/src/indexer/api.service.ts +++ b/packages/node/src/indexer/api.service.ts @@ -53,7 +53,7 @@ export class ApiService { private fetchBlocksBatches = CosmosUtil.fetchBlocksBatches; private nodeConfig: CosmosNodeConfig; - private kyveApi: KyveApi; + private kyveApi?: KyveApi; registry: Registry; constructor( @@ -111,6 +111,7 @@ export class ApiService ); if (this.nodeConfig.kyveEndpoint) { + console.log(this.project.fileCacheDir); try { this.kyveApi = await KyveApi.create( network.chainId, @@ -120,9 +121,7 @@ export class ApiService this.project.fileCacheDir, ); } catch (e) { - logger.warn( - `Failed to use kyve for network: ${network.chainId}, resolving to rpc`, - ); + logger.warn(`${e}`); } } diff --git a/packages/node/src/utils/kyve/kyve.ts b/packages/node/src/utils/kyve/kyve.ts index 37117aa3b..03a7edd33 100644 --- a/packages/node/src/utils/kyve/kyve.ts +++ b/packages/node/src/utils/kyve/kyve.ts @@ -198,22 +198,13 @@ export class KyveApi { private async updateCurrentBundleAndDetails( height: number, ): Promise { - if (this.cachedBundleDetails.length === 0) { + let bundle = this.getBundleFromCache(height); + if (!bundle) { const bundleId = await this.getBundleId(height); - const bundleDetail = await this.getBundleById(bundleId); - this.addToCachedBundle(bundleDetail); - } - - const bundle = this.getBundleFromCache(height); - if (bundle) { - return JSON.parse(await this.getBundleData(bundle)); - } else { - const bundleId = await this.getBundleId(height); - const newBundleDetails = await this.getBundleById(bundleId); - - this.addToCachedBundle(newBundleDetails); - return JSON.parse(await this.getBundleData(newBundleDetails)); + bundle = await this.getBundleById(bundleId); + this.addToCachedBundle(bundle); } + return JSON.parse(await this.getBundleData(bundle)); } private async pollUntilReadable(bundleFilePath: string): Promise { @@ -241,9 +232,7 @@ export class KyveApi { try { await new Promise((resolve, reject) => { writeStream.on('open', resolve); - writeStream.on('error', (err) => { - reject(err); - }); + writeStream.on('error', reject); }); const zippedBundleData = await this.retrieveBundleData(bundle.storage_id); @@ -260,10 +249,9 @@ export class KyveApi { .on('finish', resolve); }); } catch (e) { - if (axios.isAxiosError(e)) { + if (!['EEXIST', 'EACCES', 'ENOENT'].includes(e.code)) { await fs.promises.unlink(bundleFilePath); } - throw e; }