From 1e0dee546f4a5003491111423eb1468437d48ae0 Mon Sep 17 00:00:00 2001 From: Emma Zhu Date: Thu, 1 Feb 2024 10:49:28 +0800 Subject: [PATCH] Fix issue of 'too many handles' error when downloading a large file --- src/common/persistence/FSExtentStore.ts | 29 +++----- src/common/persistence/FileLazyReadStream.ts | 72 ++++++++++++++++++++ tests/blob/blockblob.highlevel.test.ts | 10 +-- 3 files changed, 88 insertions(+), 23 deletions(-) create mode 100644 src/common/persistence/FileLazyReadStream.ts diff --git a/src/common/persistence/FSExtentStore.ts b/src/common/persistence/FSExtentStore.ts index 199a8012d..36a4d7170 100644 --- a/src/common/persistence/FSExtentStore.ts +++ b/src/common/persistence/FSExtentStore.ts @@ -1,6 +1,5 @@ import { close, - createReadStream, createWriteStream, fdatasync, mkdir, @@ -30,6 +29,7 @@ import IExtentStore, { } from "./IExtentStore"; import IOperationQueue from "./IOperationQueue"; import OperationQueue from "./OperationQueue"; +import FileLazyReadStream from "./FileLazyReadStream"; const statAsync = promisify(stat); const mkdirAsync = promisify(mkdir); @@ -333,26 +333,19 @@ export default class FSExtentStore implements IExtentStore { const op = () => new Promise((resolve, reject) => { this.logger.verbose( - `FSExtentStore:readExtent() Creating read stream. LocationId:${persistencyId} extentId:${ - extentChunk.id - } path:${path} offset:${extentChunk.offset} count:${ - extentChunk.count + `FSExtentStore:readExtent() Creating read stream. LocationId:${persistencyId} extentId:${extentChunk.id + } path:${path} offset:${extentChunk.offset} count:${extentChunk.count } end:${extentChunk.offset + extentChunk.count - 1}`, contextId ); - const stream = createReadStream(path, { - start: extentChunk.offset, - end: extentChunk.offset + extentChunk.count - 1 - }).on("close", () => { - this.logger.verbose( - `FSExtentStore:readExtent() Read stream closed. LocationId:${persistencyId} extentId:${ - extentChunk.id - } path:${path} offset:${extentChunk.offset} count:${ - extentChunk.count - } end:${extentChunk.offset + extentChunk.count - 1}`, - contextId - ); - }); + const stream = new FileLazyReadStream( + path, + extentChunk.offset, + extentChunk.offset + extentChunk.count - 1, + this.logger, + persistencyId, + extentChunk.id, + contextId); resolve(stream); }); diff --git a/src/common/persistence/FileLazyReadStream.ts b/src/common/persistence/FileLazyReadStream.ts new file mode 100644 index 000000000..dbe57853a --- /dev/null +++ b/src/common/persistence/FileLazyReadStream.ts @@ -0,0 +1,72 @@ +import { ReadStream, createReadStream } from "fs"; +import { Readable } from "stream"; +import ILogger from "../ILogger"; + + +export default class FileLazyReadStream extends Readable { + private extentStream: ReadStream | undefined; + constructor( + private readonly extentPath: string, + private readonly start: number, + private readonly end: number, + private readonly logger: ILogger, + private readonly persistencyId: string, + private readonly extentId: string, + private readonly contextId?: string) { + super(); + } + + public _read(): void { + if (this.extentStream === undefined) { + this.extentStream = createReadStream(this.extentPath, { + start: this.start, + end: this.end + }).on("close", () => { + this.logger.verbose( + `FSExtentStore:readExtent() Read stream closed. LocationId:${this.persistencyId} extentId:${this.extentId + } path:${this.extentPath} offset:${this.start} end:${this.end}`, + this.contextId + ); + }); + this.setSourceEventHandlers(); + } + this.extentStream?.resume(); + } + + private setSourceEventHandlers() { + this.extentStream?.on("data", this.sourceDataHandler); + this.extentStream?.on("end", this.sourceErrorOrEndHandler); + this.extentStream?.on("error", this.sourceErrorOrEndHandler); + } + + private removeSourceEventHandlers() { + this.extentStream?.removeListener("data", this.sourceDataHandler); + this.extentStream?.removeListener("end", this.sourceErrorOrEndHandler); + this.extentStream?.removeListener("error", this.sourceErrorOrEndHandler); + } + + private sourceDataHandler = (data: Buffer) => { + if (!this.push(data)) { + this.extentStream?.pause(); + } + } + + private sourceErrorOrEndHandler = (err?: Error) => { + if (err && err.name === "AbortError") { + this.destroy(err); + return; + } + + this.removeSourceEventHandlers(); + this.push(null); + this.destroy(err); + } + + _destroy(error: Error | null, callback: (error?: Error) => void): void { + // remove listener from source and release source + //this.removeSourceEventHandlers(); + (this.extentStream as Readable).destroy(); + + callback(error === null ? undefined : error); + } +} \ No newline at end of file diff --git a/tests/blob/blockblob.highlevel.test.ts b/tests/blob/blockblob.highlevel.test.ts index f2057eeed..ec0d04011 100644 --- a/tests/blob/blockblob.highlevel.test.ts +++ b/tests/blob/blockblob.highlevel.test.ts @@ -21,7 +21,7 @@ import { } from "../testutils"; // Set true to enable debug log -configLogger(false); +configLogger(true); // tslint:disable:no-empty describe("BlockBlobHighlevel", () => { @@ -179,7 +179,7 @@ describe("BlockBlobHighlevel", () => { aborter.abort(); } }); - } catch (err) {} + } catch (err) { } assert.ok(eventTriggered); }).timeout(timeoutForLargeFileUploadingTest); @@ -198,7 +198,7 @@ describe("BlockBlobHighlevel", () => { aborter.abort(); } }); - } catch (err) {} + } catch (err) { } assert.ok(eventTriggered); }); @@ -260,7 +260,7 @@ describe("BlockBlobHighlevel", () => { abortSignal: AbortController.timeout(1) }); assert.fail(); - } catch (err:any) { + } catch (err: any) { assert.ok((err.message as string).toLowerCase().includes("abort")); } }).timeout(timeoutForLargeFileUploadingTest); @@ -314,7 +314,7 @@ describe("BlockBlobHighlevel", () => { aborter.abort(); } }); - } catch (err) {} + } catch (err) { } assert.ok(eventTriggered); }).timeout(timeoutForLargeFileUploadingTest);