Skip to content

Commit

Permalink
refactor based on review, update tmpDir, update error handling on str…
Browse files Browse the repository at this point in the history
…eam, tidy up
  • Loading branch information
bz888 committed Apr 17, 2024
1 parent 3ba4fa1 commit 491989d
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 30 deletions.
25 changes: 18 additions & 7 deletions packages/node/src/configure/SubqueryProject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
// SPDX-License-Identifier: GPL-3.0

import assert from 'assert';
import fs from 'fs';
import os from 'os';
import path from 'path';
import { Injectable } from '@nestjs/common';
import { LocalReader, makeTempDir, validateSemver } from '@subql/common';
import { LocalReader, validateSemver } from '@subql/common';
import {
CosmosProjectNetworkConfig,
parseCosmosProjectManifest,
Expand Down Expand Up @@ -101,26 +104,33 @@ export class SubqueryProject implements ISubqueryProject {
NOT_SUPPORT('<1.0.0');
}

const fileCacheDir = await getFileCacheDir(reader, root);

return loadProjectFromManifestBase(
manifest.asV1_0_0,
reader,
path,
root,
networkOverrides,
fileCacheDir,
);
}
}

async function getFileCacheDir(
reader: Reader,
projectRoot: string,
chainId: string,
): Promise<string> {
if (isTmpDir(projectRoot)) return projectRoot;
if (reader instanceof LocalReader) return makeTempDir();

if (reader instanceof LocalReader) {
const tmpDir = path.join(os.tmpdir(), `kyveTmpFileCache_${chainId}`);
try {
await fs.promises.mkdir(tmpDir);
} catch (e) {
if (e.code === 'EEXIST') {
return tmpDir;
}
}
return tmpDir;
}
return projectRoot;
}

Expand All @@ -132,7 +142,6 @@ async function loadProjectFromManifestBase(
path: string,
root: string,
networkOverrides?: Partial<CosmosProjectNetworkConfig>,
fileCacheDir?: string,
): Promise<SubqueryProject> {
if (typeof projectManifest.network.endpoint === 'string') {
projectManifest.network.endpoint = [projectManifest.network.endpoint];
Expand Down Expand Up @@ -183,6 +192,8 @@ async function loadProjectFromManifestBase(
),
);

const fileCacheDir = await getFileCacheDir(reader, root, network.chainId);

return new SubqueryProject(
reader.root ? reader.root : path, //TODO, need to method to get project_id
root,
Expand Down
7 changes: 3 additions & 4 deletions packages/node/src/indexer/api.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ export class ApiService
{
private fetchBlocksBatches = CosmosUtil.fetchBlocksBatches;
private nodeConfig: CosmosNodeConfig;
private kyveApi: KyveApi;
private kyveApi?: KyveApi;
registry: Registry;

constructor(
Expand Down Expand Up @@ -111,6 +111,7 @@ export class ApiService
);

if (this.nodeConfig.kyveEndpoint) {
console.log(this.project.fileCacheDir);
try {
this.kyveApi = await KyveApi.create(
network.chainId,
Expand All @@ -120,9 +121,7 @@ export class ApiService
this.project.fileCacheDir,
);
} catch (e) {
logger.warn(
`Failed to use kyve for network: ${network.chainId}, resolving to rpc`,
);
logger.warn(`${e}`);
}
}

Expand Down
26 changes: 7 additions & 19 deletions packages/node/src/utils/kyve/kyve.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,22 +198,13 @@ export class KyveApi {
private async updateCurrentBundleAndDetails(
height: number,
): Promise<KyveBundleData[]> {
if (this.cachedBundleDetails.length === 0) {
let bundle = this.getBundleFromCache(height);
if (!bundle) {
const bundleId = await this.getBundleId(height);
const bundleDetail = await this.getBundleById(bundleId);
this.addToCachedBundle(bundleDetail);
}

const bundle = this.getBundleFromCache(height);
if (bundle) {
return JSON.parse(await this.getBundleData(bundle));
} else {
const bundleId = await this.getBundleId(height);
const newBundleDetails = await this.getBundleById(bundleId);

this.addToCachedBundle(newBundleDetails);
return JSON.parse(await this.getBundleData(newBundleDetails));
bundle = await this.getBundleById(bundleId);
this.addToCachedBundle(bundle);
}
return JSON.parse(await this.getBundleData(bundle));
}

private async pollUntilReadable(bundleFilePath: string): Promise<string> {
Expand Down Expand Up @@ -241,9 +232,7 @@ export class KyveApi {
try {
await new Promise((resolve, reject) => {
writeStream.on('open', resolve);
writeStream.on('error', (err) => {
reject(err);
});
writeStream.on('error', reject);
});

const zippedBundleData = await this.retrieveBundleData(bundle.storage_id);
Expand All @@ -260,10 +249,9 @@ export class KyveApi {
.on('finish', resolve);
});
} catch (e) {
if (axios.isAxiosError(e)) {
if (!['EEXIST', 'EACCES', 'ENOENT'].includes(e.code)) {
await fs.promises.unlink(bundleFilePath);
}

throw e;
}

Expand Down

0 comments on commit 491989d

Please sign in to comment.