From 7fb3980ce126c6b18e7a507dc1be66d034b54dca Mon Sep 17 00:00:00 2001 From: "F. Wimmer" Date: Sat, 10 Aug 2024 15:48:18 +0000 Subject: [PATCH 01/18] feat: add streaming to JSON.parse --- examples/readers/package.json | 3 +- packages/llamaindex/package.json | 1 + packages/llamaindex/src/readers/JSONReader.ts | 79 ++++++++++++++++++- pnpm-lock.yaml | 17 +++- 4 files changed, 94 insertions(+), 6 deletions(-) diff --git a/examples/readers/package.json b/examples/readers/package.json index eb545b8579..4944aa1fbc 100644 --- a/examples/readers/package.json +++ b/examples/readers/package.json @@ -14,7 +14,8 @@ "start:assemblyai": "node --import tsx ./src/assemblyai.ts", "start:llamaparse-dir": "node --import tsx ./src/simple-directory-reader-with-llamaparse.ts", "start:llamaparse-json": "node --import tsx ./src/llamaparse-json.ts", - "start:discord": "node --import tsx ./src/discord.ts" + "start:discord": "node --import tsx ./src/discord.ts", + "start:json": "node --import tsx ./src/json.ts" }, "dependencies": { "llamaindex": "*" diff --git a/packages/llamaindex/package.json b/packages/llamaindex/package.json index 5bbddcbdfc..86a4227484 100644 --- a/packages/llamaindex/package.json +++ b/packages/llamaindex/package.json @@ -36,6 +36,7 @@ "@mixedbread-ai/sdk": "^2.2.11", "@pinecone-database/pinecone": "^2.2.2", "@qdrant/js-client-rest": "^1.9.0", + "@streamparser/json-whatwg": "^0.0.21", "@types/lodash": "^4.17.4", "@types/node": "^20.14.5", "@types/papaparse": "^5.3.14", diff --git a/packages/llamaindex/src/readers/JSONReader.ts b/packages/llamaindex/src/readers/JSONReader.ts index 2f4f99a9f0..31e1edcfbb 100644 --- a/packages/llamaindex/src/readers/JSONReader.ts +++ b/packages/llamaindex/src/readers/JSONReader.ts @@ -1,6 +1,32 @@ import type { JSONValue } from "@llamaindex/core/global"; import { Document, FileReader } from "@llamaindex/core/schema"; +import { JSONParser} from "@streamparser/json-whatwg"; + export interface JSONReaderOptions { + /** + * The threshold for using streaming mode. + * Give the approximate size of the JSON data in MB. Estimates character length by calculating: "(streamingThreshold * 1024 * 1024) / 2" and comparing against string.length + * Streaming mode avoids memory issues when parsing large JSON data. Set "undefined" to disable streaming. Set "0" to always use streaming. + * + * @default 100 MB + */ + streamingThreshold?: number; + + /** + * The size of the buffer used to store strings. Passthrough parameter of "@streamparser/json-whatwg" + * Useful for edge evnironments, see https://github.com/juanjoDiaz/streamparser-json/tree/main/packages/whatwg for more details. + * + * @default undefined + */ + stringBufferSize?: number; + + /** + * The size of the buffer used to store numbers. Passthrough parameter of "@streamparser/json-whatwg + * Useful for edge evnironments, see https://github.com/juanjoDiaz/streamparser-json/tree/main/packages/whatwg for more details. + * + * @default undefined + */ + numberBufferSize?: number; /** * Whether to ensure only ASCII characters. * Converts non-ASCII characters to their unicode escape sequence. @@ -51,6 +77,7 @@ export class JSONReader extends FileReader { constructor(options: JSONReaderOptions = {}) { super(); this.options = { + streamingThreshold: 100, ensureAscii: false, isJsonLines: false, cleanJson: true, @@ -59,13 +86,16 @@ export class JSONReader extends FileReader { this.validateOptions(); } private validateOptions(): void { - const { levelsBack, collapseLength } = this.options; + const { levelsBack, collapseLength, streamingThreshold } = this.options; if (levelsBack !== undefined && levelsBack < 0) { throw new JSONReaderError("levelsBack must not be negative"); } if (collapseLength !== undefined && collapseLength < 0) { throw new JSONReaderError("collapseLength must not be negative"); } + if (streamingThreshold !== undefined && streamingThreshold < 0) { + throw new JSONReaderError("streamingThreshold must not be negative"); + } } /** @@ -76,15 +106,56 @@ export class JSONReader extends FileReader { */ async loadDataAsContent(content: Uint8Array): Promise { const jsonStr = new TextDecoder("utf-8").decode(content); - const parser = this.parseJsonString(jsonStr); + + const limit = ((this.options.streamingThreshold ?? Infinity) * 1024 * 1024) / 2; + + + if (jsonStr.length > (limit)) { + console.log(`Using streaming to parse JSON as character length exceeds calculated limit: "${limit}"`); + const stream = new ReadableStream({ + async start(controller) { + controller.enqueue(content); + controller.close(); + } + }) + return await this.streamParseJsonString(stream); + } else { + const parser = this.parseJsonString(jsonStr); + const documents: Document[] = []; + + for await (const data of parser) { + documents.push(await this.createDocument(data)); + } + return documents; + } + } + + private async streamParseJsonString(stream: ReadableStream): Promise { + const parser = new JSONParser({paths: ['$'], stringBufferSize: this.options.stringBufferSize, numberBufferSize: this.options.numberBufferSize}); + const reader = stream.pipeThrough(parser).getReader(); const documents: Document[] = []; - for await (const data of parser) { - documents.push(await this.createDocument(data)); + while (true) { + const { done, value: parsedElementInfo } = await reader.read(); + if (done) break; + + const { value, partial } = parsedElementInfo; + + if (!partial) { + if (Array.isArray(value)) { + for (const item of value) { + documents.push(await this.createDocument(item as T)); + } + } + documents.push(await this.createDocument(value as T)); } + + } + return documents; } + private async *parseJsonString(jsonStr: string): AsyncGenerator { if (this.options.isJsonLines) { yield* this.parseJsonLines(jsonStr); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 96574d0ef3..377b8c122d 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -515,6 +515,9 @@ importers: '@qdrant/js-client-rest': specifier: ^1.9.0 version: 1.10.0(typescript@5.5.3) + '@streamparser/json-whatwg': + specifier: ^0.0.21 + version: 0.0.21 '@types/lodash': specifier: ^4.17.4 version: 4.17.6 @@ -568,7 +571,7 @@ importers: version: 2.0.0 mongodb: specifier: ^6.7.0 - version: 6.8.0(@aws-sdk/credential-providers@3.613.0(@aws-sdk/client-sso-oidc@3.613.0(@aws-sdk/client-sts@3.613.0))) + version: 6.8.0(@aws-sdk/credential-providers@3.613.0) notion-md-crawler: specifier: ^1.0.0 version: 1.0.0(encoding@0.1.13) @@ -3642,6 +3645,12 @@ packages: resolution: {integrity: sha512-4pP0EV3iTsexDx+8PPGAKCQpd/6hsQBaQhqWzU4hqKPHN5epPsxKbvUTIiYIHTxaKt6/kEaqPBpu/ufvfbrRzw==} engines: {node: '>=16.0.0'} + '@streamparser/json-whatwg@0.0.21': + resolution: {integrity: sha512-XMn/aAz5lIwtY63ssMWgSdTCnV5jkSceu58StvBIZQ1cM54lgbVaYv6jhM+Z0HhydUDFwtsE4s7X/LgaLbThWQ==} + + '@streamparser/json@0.0.21': + resolution: {integrity: sha512-v+49JBiG1kmc/9Ug79Lz9wyKaRocBgCnpRaLpdy7p0d3ICKtOAfc/H/Epa1j3F6YdnzjnZKKrnJ8xnh/v1P8Aw==} + '@svgr/babel-plugin-add-jsx-attribute@8.0.0': resolution: {integrity: sha512-b9MIk7yhdS1pMCZM8VeNfUlSKVRhsHZNMl5O9SfaX0l0t5wjdgu4IDzGB8bpnGBBOjGST3rRFVsaaEtI4W6f7g==} engines: {node: '>=14'} @@ -14981,6 +14990,12 @@ snapshots: '@smithy/types': 3.3.0 tslib: 2.6.3 + '@streamparser/json-whatwg@0.0.21': + dependencies: + '@streamparser/json': 0.0.21 + + '@streamparser/json@0.0.21': {} + '@svgr/babel-plugin-add-jsx-attribute@8.0.0(@babel/core@7.24.7)': dependencies: '@babel/core': 7.24.7 From b8c1dd9d08444431e275192e73bf861b0e439cf7 Mon Sep 17 00:00:00 2001 From: "F. Wimmer" Date: Sat, 10 Aug 2024 15:50:49 +0000 Subject: [PATCH 02/18] clean up --- packages/llamaindex/src/readers/JSONReader.ts | 46 +++++++++++-------- 1 file changed, 26 insertions(+), 20 deletions(-) diff --git a/packages/llamaindex/src/readers/JSONReader.ts b/packages/llamaindex/src/readers/JSONReader.ts index 31e1edcfbb..642caf2983 100644 --- a/packages/llamaindex/src/readers/JSONReader.ts +++ b/packages/llamaindex/src/readers/JSONReader.ts @@ -1,15 +1,15 @@ import type { JSONValue } from "@llamaindex/core/global"; import { Document, FileReader } from "@llamaindex/core/schema"; -import { JSONParser} from "@streamparser/json-whatwg"; +import { JSONParser } from "@streamparser/json-whatwg"; export interface JSONReaderOptions { /** - * The threshold for using streaming mode. - * Give the approximate size of the JSON data in MB. Estimates character length by calculating: "(streamingThreshold * 1024 * 1024) / 2" and comparing against string.length - * Streaming mode avoids memory issues when parsing large JSON data. Set "undefined" to disable streaming. Set "0" to always use streaming. - * - * @default 100 MB - */ + * The threshold for using streaming mode. + * Give the approximate size of the JSON data in MB. Estimates character length by calculating: "(streamingThreshold * 1024 * 1024) / 2" and comparing against string.length + * Streaming mode avoids memory issues when parsing large JSON data. Set "undefined" to disable streaming. Set "0" to always use streaming. + * + * @default 100 MB + */ streamingThreshold?: number; /** @@ -106,23 +106,25 @@ export class JSONReader extends FileReader { */ async loadDataAsContent(content: Uint8Array): Promise { const jsonStr = new TextDecoder("utf-8").decode(content); - - const limit = ((this.options.streamingThreshold ?? Infinity) * 1024 * 1024) / 2; - - if (jsonStr.length > (limit)) { - console.log(`Using streaming to parse JSON as character length exceeds calculated limit: "${limit}"`); + const limit = + ((this.options.streamingThreshold ?? Infinity) * 1024 * 1024) / 2; + + if (jsonStr.length > limit) { + console.log( + `Using streaming to parse JSON as character length exceeds calculated limit: "${limit}"`, + ); const stream = new ReadableStream({ async start(controller) { controller.enqueue(content); controller.close(); - } - }) + }, + }); return await this.streamParseJsonString(stream); } else { const parser = this.parseJsonString(jsonStr); const documents: Document[] = []; - + for await (const data of parser) { documents.push(await this.createDocument(data)); } @@ -130,8 +132,14 @@ export class JSONReader extends FileReader { } } - private async streamParseJsonString(stream: ReadableStream): Promise { - const parser = new JSONParser({paths: ['$'], stringBufferSize: this.options.stringBufferSize, numberBufferSize: this.options.numberBufferSize}); + private async streamParseJsonString( + stream: ReadableStream, + ): Promise { + const parser = new JSONParser({ + paths: ["$"], + stringBufferSize: this.options.stringBufferSize, + numberBufferSize: this.options.numberBufferSize, + }); const reader = stream.pipeThrough(parser).getReader(); const documents: Document[] = []; @@ -148,14 +156,12 @@ export class JSONReader extends FileReader { } } documents.push(await this.createDocument(value as T)); - } - + } } return documents; } - private async *parseJsonString(jsonStr: string): AsyncGenerator { if (this.options.isJsonLines) { yield* this.parseJsonLines(jsonStr); From 4e7df88e33df5dd8712203772d4810c488942c35 Mon Sep 17 00:00:00 2001 From: "F. Wimmer" Date: Sat, 10 Aug 2024 23:59:04 +0000 Subject: [PATCH 03/18] refactor --- packages/llamaindex/package.json | 2 +- packages/llamaindex/src/readers/JSONReader.ts | 162 ++++++++++-------- pnpm-lock.yaml | 24 +-- 3 files changed, 96 insertions(+), 92 deletions(-) diff --git a/packages/llamaindex/package.json b/packages/llamaindex/package.json index 86a4227484..9c88773c7c 100644 --- a/packages/llamaindex/package.json +++ b/packages/llamaindex/package.json @@ -25,6 +25,7 @@ "@azure/identity": "^4.2.1", "@datastax/astra-db-ts": "^1.2.1", "@discordjs/rest": "^2.3.0", + "@discoveryjs/json-ext": "^0.6.1", "@google-cloud/vertexai": "^1.2.0", "@google/generative-ai": "0.12.0", "@grpc/grpc-js": "^1.10.11", @@ -36,7 +37,6 @@ "@mixedbread-ai/sdk": "^2.2.11", "@pinecone-database/pinecone": "^2.2.2", "@qdrant/js-client-rest": "^1.9.0", - "@streamparser/json-whatwg": "^0.0.21", "@types/lodash": "^4.17.4", "@types/node": "^20.14.5", "@types/papaparse": "^5.3.14", diff --git a/packages/llamaindex/src/readers/JSONReader.ts b/packages/llamaindex/src/readers/JSONReader.ts index 642caf2983..76dc40c830 100644 --- a/packages/llamaindex/src/readers/JSONReader.ts +++ b/packages/llamaindex/src/readers/JSONReader.ts @@ -1,35 +1,21 @@ +import { parseChunked } from "@discoveryjs/json-ext"; import type { JSONValue } from "@llamaindex/core/global"; import { Document, FileReader } from "@llamaindex/core/schema"; -import { JSONParser } from "@streamparser/json-whatwg"; export interface JSONReaderOptions { /** * The threshold for using streaming mode. * Give the approximate size of the JSON data in MB. Estimates character length by calculating: "(streamingThreshold * 1024 * 1024) / 2" and comparing against string.length - * Streaming mode avoids memory issues when parsing large JSON data. Set "undefined" to disable streaming. Set "0" to always use streaming. + * Streaming mode avoids memory issues when parsing large JSON data. Set "undefined" to disable streaming or "0" to always use streaming. * * @default 100 MB */ streamingThreshold?: number; - /** - * The size of the buffer used to store strings. Passthrough parameter of "@streamparser/json-whatwg" - * Useful for edge evnironments, see https://github.com/juanjoDiaz/streamparser-json/tree/main/packages/whatwg for more details. - * - * @default undefined - */ - stringBufferSize?: number; - - /** - * The size of the buffer used to store numbers. Passthrough parameter of "@streamparser/json-whatwg - * Useful for edge evnironments, see https://github.com/juanjoDiaz/streamparser-json/tree/main/packages/whatwg for more details. - * - * @default undefined - */ - numberBufferSize?: number; /** * Whether to ensure only ASCII characters. * Converts non-ASCII characters to their unicode escape sequence. + * * @default false */ ensureAscii?: boolean; @@ -37,6 +23,8 @@ export interface JSONReaderOptions { /** * Whether the JSON is in JSON Lines format. * Split into lines, remove empty lines, parse each line as JSON. + * Note: Uses a custom streaming parser, most likely less robust than json-ext + * * @default false */ isJsonLines?: boolean; @@ -44,6 +32,7 @@ export interface JSONReaderOptions { /** * Whether to clean the JSON by filtering out structural characters (`{}, [], and ,`). * If set to false, it will just parse the JSON, not removing structural characters. + * * @default true */ cleanJson?: boolean; @@ -51,6 +40,7 @@ export interface JSONReaderOptions { /** * Specifies how many levels up the JSON structure to include in the output. cleanJson will be ignored. * If set to 0, all levels are included. If undefined, parses the entire JSON and treats each line as an embedding. + * * @default undefined */ levelsBack?: number; @@ -58,6 +48,7 @@ export interface JSONReaderOptions { /** * The maximum length of JSON string representation to be collapsed into a single line. * Only applicable when `levelsBack` is set. + * * @default undefined */ collapseLength?: number; @@ -105,68 +96,79 @@ export class JSONReader extends FileReader { * @return {Promise} A Promise that resolves to an array of Document objects. */ async loadDataAsContent(content: Uint8Array): Promise { - const jsonStr = new TextDecoder("utf-8").decode(content); + const documents: Document[] = []; + + const parser = this.parseJson(content); + for await (const document of parser) { + documents.push(document); + } + return documents; + } + + private async *parseJson(content: Uint8Array): AsyncGenerator { + const jsonStr = new TextDecoder("utf-8").decode(content); const limit = ((this.options.streamingThreshold ?? Infinity) * 1024 * 1024) / 2; + let parsedData: AsyncGenerator; + if (jsonStr.length > limit) { + parsedData = this.parseJsonStream(content); console.log( - `Using streaming to parse JSON as character length exceeds calculated limit: "${limit}"`, + `Using streaming parser as estimated character length exceeds: "${limit}"`, ); - const stream = new ReadableStream({ - async start(controller) { - controller.enqueue(content); - controller.close(); - }, - }); - return await this.streamParseJsonString(stream); } else { - const parser = this.parseJsonString(jsonStr); - const documents: Document[] = []; + parsedData = this.parseJsonString(jsonStr); + console.log( + `Using "JSON.parse" as estimated character length is less than: "${limit}"`, + ); + } - for await (const data of parser) { - documents.push(await this.createDocument(data)); + for await (const value of parsedData) { + if (!this.options.cleanJson || !Array.isArray(value)) { + yield await this.createDocument(value as T); + } else { + for (const item of value) { + yield await this.createDocument(item as T); + } } - return documents; } } - private async streamParseJsonString( - stream: ReadableStream, - ): Promise { - const parser = new JSONParser({ - paths: ["$"], - stringBufferSize: this.options.stringBufferSize, - numberBufferSize: this.options.numberBufferSize, - }); - const reader = stream.pipeThrough(parser).getReader(); - const documents: Document[] = []; - - while (true) { - const { done, value: parsedElementInfo } = await reader.read(); - if (done) break; - - const { value, partial } = parsedElementInfo; - - if (!partial) { - if (Array.isArray(value)) { - for (const item of value) { - documents.push(await this.createDocument(item as T)); - } + private async *parseJsonString(jsonStr: string): AsyncGenerator { + try { + if (this.options.isJsonLines) { + for await (const value of this.parseJsonLines(jsonStr)) { + yield value; } - documents.push(await this.createDocument(value as T)); + } else { + const parsedData = JSON.parse(jsonStr); + yield parsedData; } + } catch (e) { + throw new JSONParseError(`Error parsing JSON: ${e} in "${jsonStr}"`); } - - return documents; } - private async *parseJsonString(jsonStr: string): AsyncGenerator { + private async *parseJsonStream( + content: Uint8Array, + ): AsyncGenerator { + const stream = new ReadableStream({ + async start(controller) { + controller.enqueue(content); + controller.close(); + }, + }); + if (this.options.isJsonLines) { - yield* this.parseJsonLines(jsonStr); - } else { - yield* this.parseJson(jsonStr); + yield* this.parseJsonLinesStream(stream); + return; + } + const parser = await parseChunked(stream); + + for await (const value of parser) { + yield value; } } @@ -185,25 +187,33 @@ export class JSONReader extends FileReader { } } - private async *parseJson(jsonStr: string): AsyncGenerator { - try { - // TODO: Add streaming to handle large JSON files - const parsedData = JSON.parse(jsonStr); + private async *parseJsonLinesStream( + stream: ReadableStream, + ): AsyncGenerator { + const reader = stream.getReader(); + let buffer = ""; - if (!this.options.cleanJson) { - // Yield the parsed data directly if cleanJson is false - yield parsedData; - } else if (Array.isArray(parsedData)) { - // Check if it's an Array, if so yield each item seperately, i.e. create a document per top-level array of the json - for (const item of parsedData) { - yield item; + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += new TextDecoder("utf-8").decode(value); + + const lines = buffer.split("\n"); + + // Keep the last partial line in the buffer + buffer = lines.pop() || ""; + + for (const line of lines) { + if (line.trim() !== "") { + yield JSON.parse(line.trim()); } - } else { - // If not an array, just yield the parsed data - yield parsedData; } - } catch (e) { - throw new JSONParseError(`Error parsing JSON: ${e} in "${jsonStr}"`); + } + + // Parse any remaining content in the buffer + if (buffer.trim() !== "") { + yield JSON.parse(buffer.trim()); } } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 377b8c122d..f801a35bf6 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -482,6 +482,9 @@ importers: '@discordjs/rest': specifier: ^2.3.0 version: 2.3.0 + '@discoveryjs/json-ext': + specifier: ^0.6.1 + version: 0.6.1 '@google-cloud/vertexai': specifier: ^1.2.0 version: 1.3.0(encoding@0.1.13) @@ -515,9 +518,6 @@ importers: '@qdrant/js-client-rest': specifier: ^1.9.0 version: 1.10.0(typescript@5.5.3) - '@streamparser/json-whatwg': - specifier: ^0.0.21 - version: 0.0.21 '@types/lodash': specifier: ^4.17.4 version: 4.17.6 @@ -2021,6 +2021,10 @@ packages: resolution: {integrity: sha512-dBVuXR082gk3jsFp7Rd/JI4kytwGHecnCoTtXFb7DB6CNHp4rg5k1bhg0nWdLGLnOV71lmDzGQaLMy8iPLY0pw==} engines: {node: '>=10.0.0'} + '@discoveryjs/json-ext@0.6.1': + resolution: {integrity: sha512-boghen8F0Q8D+0/Q1/1r6DUEieUJ8w2a1gIknExMSHBsJFOr2+0KUfHiVYBvucPwl3+RU5PFBK833FjFCh3BhA==} + engines: {node: '>=14.17.0'} + '@docsearch/css@3.6.0': resolution: {integrity: sha512-+sbxb71sWre+PwDK7X2T8+bhS6clcVMLwBPznX45Qu6opJcgRjAp7gYSDzVFp187J+feSj5dNBN1mJoi6ckkUQ==} @@ -3645,12 +3649,6 @@ packages: resolution: {integrity: sha512-4pP0EV3iTsexDx+8PPGAKCQpd/6hsQBaQhqWzU4hqKPHN5epPsxKbvUTIiYIHTxaKt6/kEaqPBpu/ufvfbrRzw==} engines: {node: '>=16.0.0'} - '@streamparser/json-whatwg@0.0.21': - resolution: {integrity: sha512-XMn/aAz5lIwtY63ssMWgSdTCnV5jkSceu58StvBIZQ1cM54lgbVaYv6jhM+Z0HhydUDFwtsE4s7X/LgaLbThWQ==} - - '@streamparser/json@0.0.21': - resolution: {integrity: sha512-v+49JBiG1kmc/9Ug79Lz9wyKaRocBgCnpRaLpdy7p0d3ICKtOAfc/H/Epa1j3F6YdnzjnZKKrnJ8xnh/v1P8Aw==} - '@svgr/babel-plugin-add-jsx-attribute@8.0.0': resolution: {integrity: sha512-b9MIk7yhdS1pMCZM8VeNfUlSKVRhsHZNMl5O9SfaX0l0t5wjdgu4IDzGB8bpnGBBOjGST3rRFVsaaEtI4W6f7g==} engines: {node: '>=14'} @@ -13021,6 +13019,8 @@ snapshots: '@discoveryjs/json-ext@0.5.7': {} + '@discoveryjs/json-ext@0.6.1': {} + '@docsearch/css@3.6.0': {} '@docsearch/react@3.6.0(@algolia/client-search@4.24.0)(@types/react@18.3.3)(react-dom@18.3.1(react@18.3.1))(react@18.3.1)(search-insights@2.15.0)': @@ -14990,12 +14990,6 @@ snapshots: '@smithy/types': 3.3.0 tslib: 2.6.3 - '@streamparser/json-whatwg@0.0.21': - dependencies: - '@streamparser/json': 0.0.21 - - '@streamparser/json@0.0.21': {} - '@svgr/babel-plugin-add-jsx-attribute@8.0.0(@babel/core@7.24.7)': dependencies: '@babel/core': 7.24.7 From fed6cf864e1a3e379e5a18f569dc26dd2055df04 Mon Sep 17 00:00:00 2001 From: "F. Wimmer" Date: Sun, 11 Aug 2024 00:59:49 +0000 Subject: [PATCH 04/18] cleanup --- packages/llamaindex/src/readers/JSONReader.ts | 46 +++++++++++-------- 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/packages/llamaindex/src/readers/JSONReader.ts b/packages/llamaindex/src/readers/JSONReader.ts index 76dc40c830..4c7300fde6 100644 --- a/packages/llamaindex/src/readers/JSONReader.ts +++ b/packages/llamaindex/src/readers/JSONReader.ts @@ -52,6 +52,7 @@ export interface JSONReaderOptions { * @default undefined */ collapseLength?: number; + verbose?: boolean; } export class JSONReaderError extends Error {} @@ -72,9 +73,13 @@ export class JSONReader extends FileReader { ensureAscii: false, isJsonLines: false, cleanJson: true, + verbose: false, ...options, }; this.validateOptions(); + this.log( + `JSONReader initialized with options: ${JSON.stringify(this.options)}`, + ); } private validateOptions(): void { const { levelsBack, collapseLength, streamingThreshold } = this.options; @@ -111,19 +116,14 @@ export class JSONReader extends FileReader { const limit = ((this.options.streamingThreshold ?? Infinity) * 1024 * 1024) / 2; - let parsedData: AsyncGenerator; + const parsedData = + jsonStr.length > limit + ? this.parseJsonStream(content) + : this.parseJsonString(jsonStr); - if (jsonStr.length > limit) { - parsedData = this.parseJsonStream(content); - console.log( - `Using streaming parser as estimated character length exceeds: "${limit}"`, - ); - } else { - parsedData = this.parseJsonString(jsonStr); - console.log( - `Using "JSON.parse" as estimated character length is less than: "${limit}"`, - ); - } + this.log( + `Using ${jsonStr.length > limit ? "streaming parser" : "JSON.parse"} as estimated character length ${jsonStr.length > limit ? "exceeds" : "is less than"}: "${limit}"`, + ); for await (const value of parsedData) { if (!this.options.cleanJson || !Array.isArray(value)) { @@ -140,6 +140,7 @@ export class JSONReader extends FileReader { try { if (this.options.isJsonLines) { for await (const value of this.parseJsonLines(jsonStr)) { + // Yield each JSON line separately if in JSON Lines format yield value; } } else { @@ -162,9 +163,11 @@ export class JSONReader extends FileReader { }); if (this.options.isJsonLines) { + this.log("Parsing JSON Stream in JSON Lines format"); yield* this.parseJsonLinesStream(stream); return; } + this.log("Parsing JSON Stream"); const parser = await parseChunked(stream); for await (const value of parser) { @@ -191,28 +194,29 @@ export class JSONReader extends FileReader { stream: ReadableStream, ): AsyncGenerator { const reader = stream.getReader(); + const decoder = new TextDecoder("utf-8"); let buffer = ""; while (true) { const { done, value } = await reader.read(); if (done) break; - buffer += new TextDecoder("utf-8").decode(value); - + // Decode the streamed data incrementally + buffer += decoder.decode(value, { stream: true }); const lines = buffer.split("\n"); - // Keep the last partial line in the buffer + // Keep the incomplete line in the buffer buffer = lines.pop() || ""; for (const line of lines) { - if (line.trim() !== "") { + if (line.trim()) { yield JSON.parse(line.trim()); } } } - // Parse any remaining content in the buffer - if (buffer.trim() !== "") { + // Parse any remaining data in the buffer + if (buffer.trim()) { yield JSON.parse(buffer.trim()); } } @@ -238,6 +242,7 @@ export class JSONReader extends FileReader { private async prepareDepthFirstYield(data: T): Promise { const levelsBack = this.options.levelsBack ?? 0; const results: string[] = []; + for await (const value of this.depthFirstYield( data, levelsBack === 0 ? Infinity : levelsBack, @@ -388,4 +393,9 @@ export class JSONReader extends FileReader { (char) => `\\u${char.charCodeAt(0).toString(16).padStart(4, "0")}`, ); } + private log(message: string): void { + if (this.options.verbose) { + console.log(message); + } + } } From 6811ed7b9527dff0f11e8f22ac38015621229021 Mon Sep 17 00:00:00 2001 From: "F. Wimmer" Date: Sun, 11 Aug 2024 03:16:27 +0000 Subject: [PATCH 05/18] cr --- packages/llamaindex/src/readers/JSONReader.ts | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/packages/llamaindex/src/readers/JSONReader.ts b/packages/llamaindex/src/readers/JSONReader.ts index 4c7300fde6..d7b610b969 100644 --- a/packages/llamaindex/src/readers/JSONReader.ts +++ b/packages/llamaindex/src/readers/JSONReader.ts @@ -8,7 +8,7 @@ export interface JSONReaderOptions { * Give the approximate size of the JSON data in MB. Estimates character length by calculating: "(streamingThreshold * 1024 * 1024) / 2" and comparing against string.length * Streaming mode avoids memory issues when parsing large JSON data. Set "undefined" to disable streaming or "0" to always use streaming. * - * @default 100 MB + * @default 50 MB */ streamingThreshold?: number; @@ -52,6 +52,11 @@ export interface JSONReaderOptions { * @default undefined */ collapseLength?: number; + /** + * Whether to enable verbose logging. + * + * @default false + */ verbose?: boolean; } @@ -69,7 +74,7 @@ export class JSONReader extends FileReader { constructor(options: JSONReaderOptions = {}) { super(); this.options = { - streamingThreshold: 100, + streamingThreshold: 50, ensureAscii: false, isJsonLines: false, cleanJson: true, @@ -122,7 +127,7 @@ export class JSONReader extends FileReader { : this.parseJsonString(jsonStr); this.log( - `Using ${jsonStr.length > limit ? "streaming parser" : "JSON.parse"} as estimated character length ${jsonStr.length > limit ? "exceeds" : "is less than"}: "${limit}"`, + `Using ${jsonStr.length > limit ? "streaming parser" : "JSON.parse"} as string length ${jsonStr.length > limit ? "exceeds" : "is less than"} calculated character limit: "${limit}"`, ); for await (const value of parsedData) { @@ -139,11 +144,13 @@ export class JSONReader extends FileReader { private async *parseJsonString(jsonStr: string): AsyncGenerator { try { if (this.options.isJsonLines) { + this.log("Parsing JSON String in JSON Lines format"); for await (const value of this.parseJsonLines(jsonStr)) { // Yield each JSON line separately if in JSON Lines format yield value; } } else { + this.log("Parsing JSON String"); const parsedData = JSON.parse(jsonStr); yield parsedData; } @@ -168,11 +175,7 @@ export class JSONReader extends FileReader { return; } this.log("Parsing JSON Stream"); - const parser = await parseChunked(stream); - - for await (const value of parser) { - yield value; - } + yield await parseChunked(stream); } private async *parseJsonLines(jsonStr: string): AsyncGenerator { From d20dd9998a7b4ba80a487d1dba5e17217b5fafc7 Mon Sep 17 00:00:00 2001 From: "F. Wimmer" Date: Sun, 11 Aug 2024 03:17:40 +0000 Subject: [PATCH 06/18] docs: update --- apps/docs/docs/modules/data_loaders/json.md | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/apps/docs/docs/modules/data_loaders/json.md b/apps/docs/docs/modules/data_loaders/json.md index c12767c05d..584851c7bb 100644 --- a/apps/docs/docs/modules/data_loaders/json.md +++ b/apps/docs/docs/modules/data_loaders/json.md @@ -2,6 +2,7 @@ A simple JSON data loader with various options. Either parses the entire string, cleaning it and treat each line as an embedding or performs a recursive depth-first traversal yielding JSON paths. +Supports streaming of large JSON data using [@discoveryjs/json-ext](https://github.com/discoveryjs/json-ext) ## Usage @@ -20,12 +21,16 @@ const docsFromContent = reader.loadDataAsContent(content); Basic: +- `streamingThreshold?`: The threshold for using streaming mode in MB of the JSON Data. CEstimates characters by calculating bytes: `(streamingThreshold * 1024 * 1024) / 2` and comparing against `.length` of the JSON string. Set `undefined` to disable streaming or `0` to always use streaming. Default is `50` MB. + - `ensureAscii?`: Wether to ensure only ASCII characters be present in the output by converting non-ASCII characters to their unicode escape sequence. Default is `false`. -- `isJsonLines?`: Wether the JSON is in JSON Lines format. If true, will split into lines, remove empty one and parse each line as JSON. Default is `false` +- `isJsonLines?`: Wether the JSON is in JSON Lines format. If true, will split into lines, remove empty one and parse each line as JSON. Note: Uses a custom streaming parser, most likely less robust than json-ext. Default is `false` - `cleanJson?`: Whether to clean the JSON by filtering out structural characters (`{}, [], and ,`). If set to false, it will just parse the JSON, not removing structural characters. Default is `true`. +- `verbose?`: Whether to enable verbose logging. Default is `false` + Depth-First-Traversal: - `levelsBack?`: Specifies how many levels up the JSON structure to include in the output. `cleanJson` will be ignored. If set to 0, all levels are included. If undefined, parses the entire JSON, treat each line as an embedding and create a document per top-level array. Default is `undefined` From 35ce9fb93087a8a8142e56affb52e6e23a345965 Mon Sep 17 00:00:00 2001 From: "F. Wimmer" Date: Sun, 11 Aug 2024 03:30:30 +0000 Subject: [PATCH 07/18] fix lock --- pnpm-lock.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index f801a35bf6..d3caf8690b 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -571,7 +571,7 @@ importers: version: 2.0.0 mongodb: specifier: ^6.7.0 - version: 6.8.0(@aws-sdk/credential-providers@3.613.0) + version: 6.8.0(@aws-sdk/credential-providers@3.613.0(@aws-sdk/client-sso-oidc@3.613.0(@aws-sdk/client-sts@3.613.0))) notion-md-crawler: specifier: ^1.0.0 version: 1.0.0(encoding@0.1.13) From 178dfa14746d71f00fa2eadb9cf80a73d3424041 Mon Sep 17 00:00:00 2001 From: "F. Wimmer" Date: Sun, 11 Aug 2024 03:48:08 +0000 Subject: [PATCH 08/18] update test --- packages/llamaindex/tests/readers/JSONReader.test.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/llamaindex/tests/readers/JSONReader.test.ts b/packages/llamaindex/tests/readers/JSONReader.test.ts index 7673baef63..253cd9992c 100644 --- a/packages/llamaindex/tests/readers/JSONReader.test.ts +++ b/packages/llamaindex/tests/readers/JSONReader.test.ts @@ -20,9 +20,11 @@ describe("JSONReader", () => { describe("constructor", () => { it("should set default options", () => { expect(reader["options"]).toEqual({ + streamingThreshold: 50, ensureAscii: false, isJsonLines: false, cleanJson: true, + verbose: false, }); }); From 23e54621675da767ae51b42e05254b98d0673f69 Mon Sep 17 00:00:00 2001 From: "F. Wimmer" Date: Sun, 11 Aug 2024 15:50:23 +0000 Subject: [PATCH 09/18] increase verbosity, better error handling --- examples/readers/src/json.ts | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/examples/readers/src/json.ts b/examples/readers/src/json.ts index 02ef507dbc..fa2577a10f 100644 --- a/examples/readers/src/json.ts +++ b/examples/readers/src/json.ts @@ -1,4 +1,5 @@ -import { JSONReader } from "llamaindex"; +// import { JSONReader } from "llamaindex"; +import { JSONReader } from "../../../packages/llamaindex/src/readers/JSONReader.js"; async function main() { // Data @@ -11,39 +12,48 @@ async function main() { const jsonlBuffer = new TextEncoder().encode(jsonlContent); // Default settings - const reader1 = new JSONReader(); + const reader1 = new JSONReader({ streamingThreshold: 0, verbose: true }); const docs1 = await reader1.loadData(file); console.log(docs1[0]); // Unclean JSON - const reader2 = new JSONReader({ cleanJson: false }); + const reader2 = new JSONReader({ cleanJson: false, verbose: true }); const docs2 = await reader2.loadData(file); console.log(docs2[0]); // Depth first yield of JSON structural paths, going back 2 levels - const reader3 = new JSONReader({ levelsBack: 2 }); + const reader3 = new JSONReader({ levelsBack: 2, verbose: true }); const docs3 = await reader3.loadData(file); console.log(docs3[0]); // Depth first yield of all levels - const reader4 = new JSONReader({ levelsBack: 0 }); + const reader4 = new JSONReader({ levelsBack: 0, verbose: true }); const docs4 = await reader4.loadData(file); console.log(docs4[0]); // Depth first yield of all levels, collapse structural paths below length 100 - const reader5 = new JSONReader({ levelsBack: 0, collapseLength: 100 }); + const reader5 = new JSONReader({ + levelsBack: 0, + collapseLength: 100, + verbose: true, + }); const docs5 = await reader5.loadData(file); console.log(docs5[0]); // Convert ASCII to unichode escape sequences - const reader6 = new JSONReader({ ensureAscii: true }); + const reader6 = new JSONReader({ ensureAscii: true, verbose: true }); const docs6 = await reader6.loadDataAsContent(nonAsciiBuffer); console.log(docs6[0]); // JSON Lines Format - const reader7 = new JSONReader({ isJsonLines: true }); + const reader7 = new JSONReader({ + isJsonLines: true, + streamingThreshold: 0, + verbose: true, + }); const docs7 = await reader7.loadDataAsContent(jsonlBuffer); console.log(docs7[0]); + console.log(docs7[1]); } main().catch(console.error); From cc8bf1cc5ff8076acab1a3571230729a99d52d43 Mon Sep 17 00:00:00 2001 From: "F. Wimmer" Date: Sun, 11 Aug 2024 16:12:46 +0000 Subject: [PATCH 10/18] factor out parser --- packages/llamaindex/src/readers/JSONReader.ts | 317 +++++++++++------- 1 file changed, 194 insertions(+), 123 deletions(-) diff --git a/packages/llamaindex/src/readers/JSONReader.ts b/packages/llamaindex/src/readers/JSONReader.ts index d7b610b969..37d623b2b2 100644 --- a/packages/llamaindex/src/readers/JSONReader.ts +++ b/packages/llamaindex/src/readers/JSONReader.ts @@ -2,6 +2,10 @@ import { parseChunked } from "@discoveryjs/json-ext"; import type { JSONValue } from "@llamaindex/core/global"; import { Document, FileReader } from "@llamaindex/core/schema"; +// Possible improvements: +// - use `json-ext` for streaming JSON.stringify. Currently once JSON.stringify is called, the data is already chunked, so there should be no high risk of memory issues +// --> json-ext can use `stringifyInfo` to get the minimum byte lengths as well as return any circular references found, could be used to avoid erroring on circular references + export interface JSONReaderOptions { /** * The threshold for using streaming mode. @@ -52,18 +56,126 @@ export interface JSONReaderOptions { * @default undefined */ collapseLength?: number; + /** * Whether to enable verbose logging. * * @default false */ verbose?: boolean; + + /** + * Provide a custom logger function. + * + * @default undefined + */ + logger?: (level: string, message: string) => void; } export class JSONReaderError extends Error {} export class JSONParseError extends JSONReaderError {} export class JSONStringifyError extends JSONReaderError {} +class JSONParser { + static async *parseJsonString( + jsonStr: string, + isJsonLines: boolean, + ): AsyncGenerator { + try { + if (isJsonLines) { + for await (const value of JSONParser.parseJsonLines(jsonStr)) { + yield value; + } + } else { + yield JSON.parse(jsonStr); + } + } catch (e) { + throw new JSONParseError( + `Error parsing JSON string: ${e instanceof Error ? e.message : "Unknown error occurred"}`, + { cause: e }, + ); + } + } + + static async *parseJsonStream( + content: Uint8Array, + isJsonLines: boolean, + ): AsyncGenerator { + const stream = new ReadableStream({ + async start(controller) { + controller.enqueue(content); + controller.close(); + }, + }); + + try { + if (isJsonLines) { + yield* JSONParser.parseJsonLinesStream(stream); + } else { + yield* await parseChunked(stream); + } + } catch (e) { + throw new JSONParseError( + `Error parsing JSON stream: ${e instanceof Error ? e.message : "Unknown error occurred"}`, + { cause: e }, + ); + } + } + + static async *parseJsonLines(jsonStr: string): AsyncGenerator { + try { + for (const line of jsonStr.split("\n")) { + if (line.trim() !== "") { + yield JSON.parse(line.trim()); + } + } + } catch (e) { + throw new JSONParseError( + `Error parsing JSON Line: ${ + e instanceof Error ? e.message : "Unknown error occurred" + }`, + { cause: e }, + ); + } + } + + static async *parseJsonLinesStream( + stream: ReadableStream, + ): AsyncGenerator { + const reader = stream.getReader(); + const decoder = new TextDecoder("utf-8"); + let buffer = ""; + + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + const lines = buffer.split("\n"); + buffer = lines.pop() || ""; + + for (const line of lines) { + if (line.trim()) { + yield JSON.parse(line.trim()); + } + } + } + + if (buffer.trim()) { + yield JSON.parse(buffer.trim()); + } + } catch (e) { + throw new JSONParseError( + `Error parsing JSON Line in stream: ${ + e instanceof Error ? e.message : "Unknown error occurred" + }`, + { cause: e }, + ); + } + } +} + /** * A reader that reads JSON data and returns an array of Document objects. * Supports various options to modify the output. @@ -73,29 +185,57 @@ export class JSONReader extends FileReader { constructor(options: JSONReaderOptions = {}) { super(); - this.options = { + this.options = this.initializeOptions(options); + this.log( + "info", + `JSONReader initialized with options: ${JSON.stringify(this.options)}`, + ); + } + + private initializeOptions( + providedOptions: JSONReaderOptions, + ): JSONReaderOptions { + const defaultOptions: JSONReaderOptions = { streamingThreshold: 50, ensureAscii: false, isJsonLines: false, cleanJson: true, verbose: false, - ...options, }; - this.validateOptions(); - this.log( - `JSONReader initialized with options: ${JSON.stringify(this.options)}`, - ); + + const options = { ...defaultOptions, ...providedOptions }; + + // Validate options immediately after merging + this.validateOptions(options, [ + "streamingThreshold", + "collapseLength", + "levelsBack", + ]); + + return options; } - private validateOptions(): void { - const { levelsBack, collapseLength, streamingThreshold } = this.options; - if (levelsBack !== undefined && levelsBack < 0) { - throw new JSONReaderError("levelsBack must not be negative"); - } - if (collapseLength !== undefined && collapseLength < 0) { - throw new JSONReaderError("collapseLength must not be negative"); + + private validateOptions( + options: JSONReaderOptions, + keys: (keyof JSONReaderOptions)[], + ): void { + for (const key of keys) { + const value = options[key]; + if (typeof value === "number" && value < 0) { + throw new JSONReaderError(`${key} must not be negative`); + } } - if (streamingThreshold !== undefined && streamingThreshold < 0) { - throw new JSONReaderError("streamingThreshold must not be negative"); + } + + private log(level: string, message: string): void { + if (this.options.logger) { + this.options.logger(level, message); // Use custom logger if provided + } else { + const timestamp = new Date().toISOString(); + const formattedMessage = `[${timestamp}] [${level.toUpperCase()}]: ${message}`; + if (this.options.verbose || level !== "debug") { + console.log(`${formattedMessage}`); + } } } @@ -123,17 +263,23 @@ export class JSONReader extends FileReader { const parsedData = jsonStr.length > limit - ? this.parseJsonStream(content) - : this.parseJsonString(jsonStr); + ? JSONParser.parseJsonStream(content, this.options.isJsonLines ?? false) + : JSONParser.parseJsonString( + jsonStr, + this.options.isJsonLines ?? false, + ); this.log( + "debug", `Using ${jsonStr.length > limit ? "streaming parser" : "JSON.parse"} as string length ${jsonStr.length > limit ? "exceeds" : "is less than"} calculated character limit: "${limit}"`, ); for await (const value of parsedData) { + // Yield the parsed data directly if cleanJson is false or the value is not an array. if (!this.options.cleanJson || !Array.isArray(value)) { yield await this.createDocument(value as T); } else { + // If it's an Array, yield each item seperately, i.e. create a document per top-level array of the json for (const item of value) { yield await this.createDocument(item as T); } @@ -141,89 +287,6 @@ export class JSONReader extends FileReader { } } - private async *parseJsonString(jsonStr: string): AsyncGenerator { - try { - if (this.options.isJsonLines) { - this.log("Parsing JSON String in JSON Lines format"); - for await (const value of this.parseJsonLines(jsonStr)) { - // Yield each JSON line separately if in JSON Lines format - yield value; - } - } else { - this.log("Parsing JSON String"); - const parsedData = JSON.parse(jsonStr); - yield parsedData; - } - } catch (e) { - throw new JSONParseError(`Error parsing JSON: ${e} in "${jsonStr}"`); - } - } - - private async *parseJsonStream( - content: Uint8Array, - ): AsyncGenerator { - const stream = new ReadableStream({ - async start(controller) { - controller.enqueue(content); - controller.close(); - }, - }); - - if (this.options.isJsonLines) { - this.log("Parsing JSON Stream in JSON Lines format"); - yield* this.parseJsonLinesStream(stream); - return; - } - this.log("Parsing JSON Stream"); - yield await parseChunked(stream); - } - - private async *parseJsonLines(jsonStr: string): AsyncGenerator { - // Process each line as a separate JSON object for JSON Lines format - for (const line of jsonStr.split("\n")) { - if (line.trim() !== "") { - try { - yield JSON.parse(line.trim()); - } catch (e) { - throw new JSONParseError( - `Error parsing JSON Line: ${e} in "${line.trim()}"`, - ); - } - } - } - } - - private async *parseJsonLinesStream( - stream: ReadableStream, - ): AsyncGenerator { - const reader = stream.getReader(); - const decoder = new TextDecoder("utf-8"); - let buffer = ""; - - while (true) { - const { done, value } = await reader.read(); - if (done) break; - - // Decode the streamed data incrementally - buffer += decoder.decode(value, { stream: true }); - const lines = buffer.split("\n"); - - // Keep the incomplete line in the buffer - buffer = lines.pop() || ""; - - for (const line of lines) { - if (line.trim()) { - yield JSON.parse(line.trim()); - } - } - } - - // Parse any remaining data in the buffer - if (buffer.trim()) { - yield JSON.parse(buffer.trim()); - } - } - private async createDocument(data: T): Promise { const docText: string = this.options.levelsBack === undefined @@ -268,17 +331,19 @@ export class JSONReader extends FileReader { this.options.cleanJson ? 1 : 0, ); if (this.options.cleanJson) { - // Clean JSON by removing structural characters and unnecessary whitespace return jsonStr .split("\n") .filter((line) => !/^[{}\[\],]*$/.test(line.trim())) - .map((line) => line.trimStart()) // Removes the indent + .map((line) => line.trimStart()) .join("\n"); } return jsonStr; } catch (e) { throw new JSONStringifyError( - `Error stringifying JSON: ${e} in "${JSON.stringify(data)}"`, + `Error stringifying JSON data: ${ + e instanceof Error ? e.message : "Unknown error occurred" + }`, + { cause: e }, ); } } @@ -301,18 +366,18 @@ export class JSONReader extends FileReader { path: string[], collapseLength?: number, ): AsyncGenerator { - try { - const jsonStr = this.serializeAndCollapse( - jsonData, - levelsBack, - path, - collapseLength, - ); - if (jsonStr !== null) { - yield jsonStr; - return; - } + const jsonStr = this.serializeAndCollapse( + jsonData, + levelsBack, + path, + collapseLength, + ); + if (jsonStr !== null) { + yield jsonStr; + return; + } + try { if (jsonData !== null && typeof jsonData === "object") { yield* this.depthFirstTraversal( jsonData, @@ -325,7 +390,10 @@ export class JSONReader extends FileReader { } } catch (e) { throw new JSONReaderError( - `Error during depth first traversal at path ${path.join(" ")}: ${e}`, + `Error during depth-first traversal at path ${path.join(" ")}: ${ + e instanceof Error ? e.message : "Unknown error occurred" + }`, + { cause: e }, ); } } @@ -342,9 +410,15 @@ export class JSONReader extends FileReader { ? `${path.slice(-levelsBack).join(" ")} ${jsonStr}` : null; } catch (e) { - throw new JSONStringifyError(`Error stringifying JSON data: ${e}`); + throw new JSONStringifyError( + `Error stringifying JSON data: ${ + e instanceof Error ? e.message : "Unknown error occurred" + }`, + { cause: e }, + ); } } + /** * A generator function that performs a depth-first traversal of the JSON data. * If the JSON data is an array, it traverses each item in the array. @@ -385,20 +459,17 @@ export class JSONReader extends FileReader { } } catch (e) { throw new JSONReaderError( - `Error during depth-first traversal of object: ${e}`, + `Error during depth-first traversal of object: ${ + e instanceof Error ? e.message : "Unknown error occurred" + }`, + { cause: e }, ); } } - private convertToAscii(str: string): string { return str.replace( /[\u007F-\uFFFF]/g, (char) => `\\u${char.charCodeAt(0).toString(16).padStart(4, "0")}`, ); } - private log(message: string): void { - if (this.options.verbose) { - console.log(message); - } - } } From a21c16ce263dc39bef8e2211c32a41d749b74de7 Mon Sep 17 00:00:00 2001 From: "F. Wimmer" Date: Mon, 12 Aug 2024 09:38:10 +0000 Subject: [PATCH 11/18] refactor --- packages/llamaindex/src/readers/JSONReader.ts | 203 +++++++++--------- 1 file changed, 97 insertions(+), 106 deletions(-) diff --git a/packages/llamaindex/src/readers/JSONReader.ts b/packages/llamaindex/src/readers/JSONReader.ts index 37d623b2b2..0eefa243a8 100644 --- a/packages/llamaindex/src/readers/JSONReader.ts +++ b/packages/llamaindex/src/readers/JSONReader.ts @@ -65,7 +65,7 @@ export interface JSONReaderOptions { verbose?: boolean; /** - * Provide a custom logger function. + * A placeholder for a custom logging function. * * @default undefined */ @@ -77,65 +77,46 @@ export class JSONParseError extends JSONReaderError {} export class JSONStringifyError extends JSONReaderError {} class JSONParser { - static async *parseJsonString( - jsonStr: string, + static async *parse( + content: string | Uint8Array, isJsonLines: boolean, + isStream: boolean, + log: (level: string, message: string) => void, ): AsyncGenerator { try { - if (isJsonLines) { - for await (const value of JSONParser.parseJsonLines(jsonStr)) { - yield value; - } - } else { - yield JSON.parse(jsonStr); - } - } catch (e) { - throw new JSONParseError( - `Error parsing JSON string: ${e instanceof Error ? e.message : "Unknown error occurred"}`, - { cause: e }, - ); - } - } - - static async *parseJsonStream( - content: Uint8Array, - isJsonLines: boolean, - ): AsyncGenerator { - const stream = new ReadableStream({ - async start(controller) { - controller.enqueue(content); - controller.close(); - }, - }); - - try { - if (isJsonLines) { - yield* JSONParser.parseJsonLinesStream(stream); + log("debug", `Parsing JSON ${isJsonLines ? "as JSON Lines" : ""}`); + + if (isStream) { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(content as Uint8Array); + controller.close(); + }, + }); + yield* isJsonLines + ? this.parseJsonLinesStream(stream) + : (async function* () { + yield parseChunked(stream); + })(); } else { - yield* await parseChunked(stream); + const jsonStr = content as string; + yield* isJsonLines + ? this.parseJsonLines(jsonStr) + : (async function* () { + yield JSON.parse(jsonStr); + })(); } } catch (e) { throw new JSONParseError( - `Error parsing JSON stream: ${e instanceof Error ? e.message : "Unknown error occurred"}`, + `Error parsing JSON: ${e instanceof Error ? e.message : "Unknown error occurred"}`, { cause: e }, ); } } static async *parseJsonLines(jsonStr: string): AsyncGenerator { - try { - for (const line of jsonStr.split("\n")) { - if (line.trim() !== "") { - yield JSON.parse(line.trim()); - } - } - } catch (e) { - throw new JSONParseError( - `Error parsing JSON Line: ${ - e instanceof Error ? e.message : "Unknown error occurred" - }`, - { cause: e }, - ); + for (const line of jsonStr.split("\n").filter((l) => l.trim())) { + yield JSON.parse(line.trim()); } } @@ -146,32 +127,23 @@ class JSONParser { const decoder = new TextDecoder("utf-8"); let buffer = ""; - try { - while (true) { - const { done, value } = await reader.read(); - if (done) break; + while (true) { + const { done, value } = await reader.read(); + if (done) break; - buffer += decoder.decode(value, { stream: true }); - const lines = buffer.split("\n"); - buffer = lines.pop() || ""; + buffer += decoder.decode(value, { stream: true }); + const lines = buffer.split("\n"); + buffer = lines.pop() || ""; - for (const line of lines) { - if (line.trim()) { - yield JSON.parse(line.trim()); - } + for (const line of lines) { + if (line.trim()) { + yield JSON.parse(line.trim()); } } + } - if (buffer.trim()) { - yield JSON.parse(buffer.trim()); - } - } catch (e) { - throw new JSONParseError( - `Error parsing JSON Line in stream: ${ - e instanceof Error ? e.message : "Unknown error occurred" - }`, - { cause: e }, - ); + if (buffer.trim()) { + yield JSON.parse(buffer.trim()); } } } @@ -182,19 +154,16 @@ class JSONParser { */ export class JSONReader extends FileReader { private options: JSONReaderOptions; + private initialized: Promise; constructor(options: JSONReaderOptions = {}) { super(); - this.options = this.initializeOptions(options); - this.log( - "info", - `JSONReader initialized with options: ${JSON.stringify(this.options)}`, - ); + this.options = this.initializeAndValidateOptions(options); + this.initialized = this.initialize(); // Kick off async initialization } - private initializeOptions( - providedOptions: JSONReaderOptions, - ): JSONReaderOptions { + // Initialize options with validation + private initializeAndValidateOptions(providedOptions: JSONReaderOptions): JSONReaderOptions { const defaultOptions: JSONReaderOptions = { streamingThreshold: 50, ensureAscii: false, @@ -205,37 +174,52 @@ export class JSONReader extends FileReader { const options = { ...defaultOptions, ...providedOptions }; - // Validate options immediately after merging - this.validateOptions(options, [ - "streamingThreshold", - "collapseLength", - "levelsBack", - ]); + // Validation logic for numeric options + const numericOptions: (keyof JSONReaderOptions)[] = ['streamingThreshold', 'collapseLength', 'levelsBack']; + for (const key of numericOptions) { + if (typeof options[key] === 'number' && options[key]! < 0) { + throw new JSONReaderError(`${key} must not be negative`); + } + } return options; } - private validateOptions( - options: JSONReaderOptions, - keys: (keyof JSONReaderOptions)[], - ): void { - for (const key of keys) { - const value = options[key]; - if (typeof value === "number" && value < 0) { - throw new JSONReaderError(`${key} must not be negative`); - } + // Asynchronous initialization + private async initialize(): Promise { + try { + await this.log( + "info", + `JSONReader initialized with options: "${JSON.stringify(this.options)}"`, + ); + // Additional async initialization can be performed here if necessary + } catch (e) { + throw new JSONReaderError( + `Error initializing JSONReader: ${e instanceof Error ? e.message : "Unknown error occurred"}`, + { cause: e }, + ); } } - private log(level: string, message: string): void { + /** + * Method to ensure initialization is complete before proceeding. + * Can be called externally to ensure any async operations during initialization have completed. + * + * @return {Promise} A promise that resolves when initialization is complete. + */ + public async ensureInitialized(): Promise { + return this.initialized; + } + + protected async log(level: string, message: string): Promise { + const shouldLog = this.options.verbose || level !== "debug"; + const timestamp = new Date().toISOString(); + const formattedMessage = `[${timestamp}] [${level.toUpperCase()}]: ${message}`; + if (this.options.logger) { - this.options.logger(level, message); // Use custom logger if provided - } else { - const timestamp = new Date().toISOString(); - const formattedMessage = `[${timestamp}] [${level.toUpperCase()}]: ${message}`; - if (this.options.verbose || level !== "debug") { - console.log(`${formattedMessage}`); - } + this.options.logger(level, message); + } else if (shouldLog) { + console.log(formattedMessage); } } @@ -247,14 +231,14 @@ export class JSONReader extends FileReader { */ async loadDataAsContent(content: Uint8Array): Promise { const documents: Document[] = []; - - const parser = this.parseJson(content); - for await (const document of parser) { + + for await (const document of this.parseJson(content)) { documents.push(document); } - + return documents; } + private async *parseJson(content: Uint8Array): AsyncGenerator { const jsonStr = new TextDecoder("utf-8").decode(content); @@ -263,13 +247,20 @@ export class JSONReader extends FileReader { const parsedData = jsonStr.length > limit - ? JSONParser.parseJsonStream(content, this.options.isJsonLines ?? false) - : JSONParser.parseJsonString( + ? JSONParser.parse( + content, + this.options.isJsonLines ?? false, + true, + this.log.bind(this), + ) + : JSONParser.parse( jsonStr, this.options.isJsonLines ?? false, + false, + this.log.bind(this), ); - this.log( + await this.log( "debug", `Using ${jsonStr.length > limit ? "streaming parser" : "JSON.parse"} as string length ${jsonStr.length > limit ? "exceeds" : "is less than"} calculated character limit: "${limit}"`, ); From 94cad05217069878c6fd65a4936c32603beadc5e Mon Sep 17 00:00:00 2001 From: "F. Wimmer" Date: Mon, 12 Aug 2024 23:41:17 +0000 Subject: [PATCH 12/18] use global logger --- packages/llamaindex/src/readers/JSONReader.ts | 188 +++++++----------- 1 file changed, 70 insertions(+), 118 deletions(-) diff --git a/packages/llamaindex/src/readers/JSONReader.ts b/packages/llamaindex/src/readers/JSONReader.ts index 0eefa243a8..7aa6aca370 100644 --- a/packages/llamaindex/src/readers/JSONReader.ts +++ b/packages/llamaindex/src/readers/JSONReader.ts @@ -1,10 +1,12 @@ import { parseChunked } from "@discoveryjs/json-ext"; import type { JSONValue } from "@llamaindex/core/global"; import { Document, FileReader } from "@llamaindex/core/schema"; +import { ReadableStream } from "@llamaindex/env"; +import { type Logger, consoleLogger } from "../internal/logger.js"; // Possible improvements: // - use `json-ext` for streaming JSON.stringify. Currently once JSON.stringify is called, the data is already chunked, so there should be no high risk of memory issues -// --> json-ext can use `stringifyInfo` to get the minimum byte lengths as well as return any circular references found, could be used to avoid erroring on circular references +// --> json-ext can use `stringifyInfo` to get the minimum byte lengths as well as return any circular references found, could be used to avoid erroring on circular references during stringification export interface JSONReaderOptions { /** @@ -56,20 +58,12 @@ export interface JSONReaderOptions { * @default undefined */ collapseLength?: number; - - /** - * Whether to enable verbose logging. - * - * @default false - */ - verbose?: boolean; - /** * A placeholder for a custom logging function. * - * @default undefined + * @default consoleLogger */ - logger?: (level: string, message: string) => void; + logger?: Logger; } export class JSONReaderError extends Error {} @@ -81,39 +75,40 @@ class JSONParser { content: string | Uint8Array, isJsonLines: boolean, isStream: boolean, - log: (level: string, message: string) => void, + logger?: Logger, ): AsyncGenerator { - try { - log("debug", `Parsing JSON ${isJsonLines ? "as JSON Lines" : ""}`); - - if (isStream) { - const stream = new ReadableStream({ - start(controller) { - controller.enqueue(content as Uint8Array); - controller.close(); - }, - }); - yield* isJsonLines - ? this.parseJsonLinesStream(stream) - : (async function* () { - yield parseChunked(stream); - })(); - } else { - const jsonStr = content as string; - yield* isJsonLines - ? this.parseJsonLines(jsonStr) - : (async function* () { - yield JSON.parse(jsonStr); - })(); + logger?.log(`Parsing JSON ${isJsonLines ? "as JSON Lines" : ""}`); + + if (isStream) { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(content as Uint8Array); + controller.close(); + }, + }); + yield* this.parseStream(stream, isJsonLines); + } else { + yield* this.parseString(content as string, isJsonLines); + } } - } catch (e) { - throw new JSONParseError( - `Error parsing JSON: ${e instanceof Error ? e.message : "Unknown error occurred"}`, - { cause: e }, - ); - } - } - + + + private static async *parseStream(stream: ReadableStream, isJsonLines: boolean) { + if (isJsonLines) { + yield* this.parseJsonLinesStream(stream); + } else { + yield await parseChunked(stream); + } + } + + private static async *parseString(jsonStr: string, isJsonLines: boolean) { + if (isJsonLines) { + yield* this.parseJsonLines(jsonStr); + } else { + yield JSON.parse(jsonStr); + } + } + static async *parseJsonLines(jsonStr: string): AsyncGenerator { for (const line of jsonStr.split("\n").filter((l) => l.trim())) { yield JSON.parse(line.trim()); @@ -152,32 +147,36 @@ class JSONParser { * A reader that reads JSON data and returns an array of Document objects. * Supports various options to modify the output. */ -export class JSONReader extends FileReader { +export class JSONReader extends FileReader { private options: JSONReaderOptions; - private initialized: Promise; constructor(options: JSONReaderOptions = {}) { super(); - this.options = this.initializeAndValidateOptions(options); - this.initialized = this.initialize(); // Kick off async initialization + this.options = this.normalizeOptions(options); } // Initialize options with validation - private initializeAndValidateOptions(providedOptions: JSONReaderOptions): JSONReaderOptions { + private normalizeOptions( + providedOptions: JSONReaderOptions, + ): JSONReaderOptions { const defaultOptions: JSONReaderOptions = { streamingThreshold: 50, ensureAscii: false, isJsonLines: false, cleanJson: true, - verbose: false, + logger: consoleLogger, }; const options = { ...defaultOptions, ...providedOptions }; // Validation logic for numeric options - const numericOptions: (keyof JSONReaderOptions)[] = ['streamingThreshold', 'collapseLength', 'levelsBack']; + const numericOptions: (keyof JSONReaderOptions)[] = [ + "streamingThreshold", + "collapseLength", + "levelsBack", + ]; for (const key of numericOptions) { - if (typeof options[key] === 'number' && options[key]! < 0) { + if (typeof options[key] === "number" && options[key]! < 0) { throw new JSONReaderError(`${key} must not be negative`); } } @@ -185,44 +184,6 @@ export class JSONReader extends FileReader { return options; } - // Asynchronous initialization - private async initialize(): Promise { - try { - await this.log( - "info", - `JSONReader initialized with options: "${JSON.stringify(this.options)}"`, - ); - // Additional async initialization can be performed here if necessary - } catch (e) { - throw new JSONReaderError( - `Error initializing JSONReader: ${e instanceof Error ? e.message : "Unknown error occurred"}`, - { cause: e }, - ); - } - } - - /** - * Method to ensure initialization is complete before proceeding. - * Can be called externally to ensure any async operations during initialization have completed. - * - * @return {Promise} A promise that resolves when initialization is complete. - */ - public async ensureInitialized(): Promise { - return this.initialized; - } - - protected async log(level: string, message: string): Promise { - const shouldLog = this.options.verbose || level !== "debug"; - const timestamp = new Date().toISOString(); - const formattedMessage = `[${timestamp}] [${level.toUpperCase()}]: ${message}`; - - if (this.options.logger) { - this.options.logger(level, message); - } else if (shouldLog) { - console.log(formattedMessage); - } - } - /** * Loads JSON data and returns an array of Document objects. * @@ -231,54 +192,45 @@ export class JSONReader extends FileReader { */ async loadDataAsContent(content: Uint8Array): Promise { const documents: Document[] = []; - + for await (const document of this.parseJson(content)) { documents.push(document); } - + return documents; } - private async *parseJson(content: Uint8Array): AsyncGenerator { const jsonStr = new TextDecoder("utf-8").decode(content); const limit = ((this.options.streamingThreshold ?? Infinity) * 1024 * 1024) / 2; + const shouldStream = jsonStr.length > limit; + + const parsedData = JSONParser.parse( + shouldStream ? content : jsonStr, + this.options.isJsonLines ?? false, + shouldStream ? true : false, + this.options.logger, + ); - const parsedData = - jsonStr.length > limit - ? JSONParser.parse( - content, - this.options.isJsonLines ?? false, - true, - this.log.bind(this), - ) - : JSONParser.parse( - jsonStr, - this.options.isJsonLines ?? false, - false, - this.log.bind(this), - ); - - await this.log( - "debug", - `Using ${jsonStr.length > limit ? "streaming parser" : "JSON.parse"} as string length ${jsonStr.length > limit ? "exceeds" : "is less than"} calculated character limit: "${limit}"`, + this.options.logger?.log( + `Using ${shouldStream ? "streaming parser" : "JSON.parse"} as string length ${shouldStream ? "exceeds" : "is less than"} calculated character limit: "${limit}"`, ); for await (const value of parsedData) { // Yield the parsed data directly if cleanJson is false or the value is not an array. if (!this.options.cleanJson || !Array.isArray(value)) { - yield await this.createDocument(value as T); + yield await this.createDocument(value as JSONValue); } else { // If it's an Array, yield each item seperately, i.e. create a document per top-level array of the json for (const item of value) { - yield await this.createDocument(item as T); + yield await this.createDocument(item as JSONValue); } } } } - private async createDocument(data: T): Promise { + private async createDocument(data: JSONValue): Promise { const docText: string = this.options.levelsBack === undefined ? this.formatJsonString(data) @@ -296,7 +248,7 @@ export class JSONReader extends FileReader { }); } - private async prepareDepthFirstYield(data: T): Promise { + private async prepareDepthFirstYield(data: JSONValue): Promise { const levelsBack = this.options.levelsBack ?? 0; const results: string[] = []; @@ -314,7 +266,7 @@ export class JSONReader extends FileReader { // Note: JSON.stringify does not differentiate between indent "undefined/null"(= no whitespaces) and "0"(= no whitespaces, but linebreaks) // as python json.dumps does. Thats why we use indent 1 and remove the leading spaces. - private formatJsonString(data: T): string { + private formatJsonString(data: JSONValue): string { try { const jsonStr = JSON.stringify( data, @@ -352,7 +304,7 @@ export class JSONReader extends FileReader { * @throws {JSONReaderError} - Throws an error if there is an issue during the depth-first traversal. */ private async *depthFirstYield( - jsonData: T, + jsonData: JSONValue, levelsBack: number, path: string[], collapseLength?: number, @@ -390,7 +342,7 @@ export class JSONReader extends FileReader { } private serializeAndCollapse( - jsonData: T, + jsonData: JSONValue, levelsBack: number, path: string[], collapseLength?: number, @@ -423,7 +375,7 @@ export class JSONReader extends FileReader { * @throws {JSONReaderError} - Throws an error if there is an issue during the depth-first traversal of the object. */ private async *depthFirstTraversal( - jsonData: T, + jsonData: JSONValue, levelsBack: number, path: string[], collapseLength?: number, @@ -439,7 +391,7 @@ export class JSONReader extends FileReader { path.push(key); if (value !== null) { yield* this.depthFirstYield( - value as T, + value, levelsBack, path, collapseLength, From 02737b319efbc5417406b42f10edcb6d8f174811 Mon Sep 17 00:00:00 2001 From: "F. Wimmer" Date: Tue, 13 Aug 2024 02:36:01 +0000 Subject: [PATCH 13/18] refactor --- packages/llamaindex/src/readers/JSONReader.ts | 225 ++++++++---------- 1 file changed, 101 insertions(+), 124 deletions(-) diff --git a/packages/llamaindex/src/readers/JSONReader.ts b/packages/llamaindex/src/readers/JSONReader.ts index 7aa6aca370..1dfd61be97 100644 --- a/packages/llamaindex/src/readers/JSONReader.ts +++ b/packages/llamaindex/src/readers/JSONReader.ts @@ -77,45 +77,57 @@ class JSONParser { isStream: boolean, logger?: Logger, ): AsyncGenerator { - logger?.log(`Parsing JSON ${isJsonLines ? "as JSON Lines" : ""}`); - - if (isStream) { - const stream = new ReadableStream({ - start(controller) { - controller.enqueue(content as Uint8Array); - controller.close(); - }, - }); - yield* this.parseStream(stream, isJsonLines); - } else { - yield* this.parseString(content as string, isJsonLines); - } + logger?.log(`Parsing JSON ${isJsonLines ? "as JSON Lines" : ""}`); + try { + if (isStream) { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(content as Uint8Array); + controller.close(); + }, + }); + yield* this.parseStream(stream, isJsonLines); + } else { + yield* this.parseString(content as string, isJsonLines); } - - - private static async *parseStream(stream: ReadableStream, isJsonLines: boolean) { - if (isJsonLines) { - yield* this.parseJsonLinesStream(stream); - } else { - yield await parseChunked(stream); - } - } - - private static async *parseString(jsonStr: string, isJsonLines: boolean) { - if (isJsonLines) { - yield* this.parseJsonLines(jsonStr); - } else { - yield JSON.parse(jsonStr); - } - } - - static async *parseJsonLines(jsonStr: string): AsyncGenerator { + } catch (e) { + throw new JSONParseError( + `Failed to parse JSON ${isJsonLines ? "as JSON Lines" : ""} ${isStream ? "while streaming" : ""}: ${ + e instanceof Error ? e.message : "Unknown error occurred" + }`, + { cause: e }, + ); + } + } + + private static async *parseStream( + stream: ReadableStream, + isJsonLines: boolean, + ) { + if (isJsonLines) { + yield* this.parseJsonLinesStream(stream); + } else { + yield await parseChunked(stream); + } + } + + private static async *parseString(jsonStr: string, isJsonLines: boolean) { + if (isJsonLines) { + yield* this.parseJsonLines(jsonStr); + } else { + yield JSON.parse(jsonStr); + } + } + + private static async *parseJsonLines( + jsonStr: string, + ): AsyncGenerator { for (const line of jsonStr.split("\n").filter((l) => l.trim())) { yield JSON.parse(line.trim()); } } - static async *parseJsonLinesStream( + private static async *parseJsonLinesStream( stream: ReadableStream, ): AsyncGenerator { const reader = stream.getReader(); @@ -190,7 +202,7 @@ export class JSONReader extends FileReader { * @param {Uint8Array} content - The JSON data as a Uint8Array. * @return {Promise} A Promise that resolves to an array of Document objects. */ - async loadDataAsContent(content: Uint8Array): Promise { + public async loadDataAsContent(content: Uint8Array): Promise { const documents: Document[] = []; for await (const document of this.parseJson(content)) { @@ -248,21 +260,6 @@ export class JSONReader extends FileReader { }); } - private async prepareDepthFirstYield(data: JSONValue): Promise { - const levelsBack = this.options.levelsBack ?? 0; - const results: string[] = []; - - for await (const value of this.depthFirstYield( - data, - levelsBack === 0 ? Infinity : levelsBack, - [], - this.options.collapseLength, - )) { - results.push(value); - } - return results.join("\n"); - } - // Note: JSON.stringify does not differentiate between indent "undefined/null"(= no whitespaces) and "0"(= no whitespaces, but linebreaks) // as python json.dumps does. Thats why we use indent 1 and remove the leading spaces. @@ -291,17 +288,25 @@ export class JSONReader extends FileReader { } } + private async prepareDepthFirstYield(data: JSONValue): Promise { + const levelsBack = this.options.levelsBack ?? 0; + const results: string[] = []; + + for await (const value of this.depthFirstYield( + data, + levelsBack === 0 ? Infinity : levelsBack, + [], + this.options.collapseLength, + )) { + results.push(value); + } + return results.join("\n"); + } + /** * A generator function that determines the next step in traversing the JSON data. - * If the serialized JSON string is not null, it yields the string and returns. - * If the JSON data is an object, it delegates the traversal to the depthFirstTraversal method. - * Otherwise, it yields the JSON data as a string. - * - * @param jsonData - The JSON data to traverse. - * @param levelsBack - The number of levels up the JSON structure to include in the output. - * @param path - The current path in the JSON structure. - * @param collapseLength - The maximum length of JSON string representation to be collapsed into a single line. - * @throws {JSONReaderError} - Throws an error if there is an issue during the depth-first traversal. + * If collapseLength is set and the serialized JSON string is not null, it yields the collapsed string. + * Otherwise it delegates traversal to the `traverseJsonData` function. */ private async *depthFirstYield( jsonData: JSONValue, @@ -317,27 +322,46 @@ export class JSONReader extends FileReader { ); if (jsonStr !== null) { yield jsonStr; - return; + } else { + yield* this.traverseJsonData(jsonData, levelsBack, path, collapseLength); } + } - try { - if (jsonData !== null && typeof jsonData === "object") { - yield* this.depthFirstTraversal( - jsonData, - levelsBack, - path, - collapseLength, - ); - } else { - yield `${path.slice(-levelsBack).join(" ")} ${String(jsonData)}`; + /** + * Traverse the JSON data. + * If the data is an array, it traverses each item in the array. + * If the data is an object, it delegates traversal to the `traverseObject` function. + * If the data is a primitive value, it yields the value with the path. + * + */ + private async *traverseJsonData( + jsonData: JSONValue, + levelsBack: number, + path: string[], + collapseLength?: number, + ): AsyncGenerator { + if (Array.isArray(jsonData)) { + for (const item of jsonData) { + yield* this.depthFirstYield(item, levelsBack, path, collapseLength); } - } catch (e) { - throw new JSONReaderError( - `Error during depth-first traversal at path ${path.join(" ")}: ${ - e instanceof Error ? e.message : "Unknown error occurred" - }`, - { cause: e }, - ); + } else if (jsonData !== null && typeof jsonData === "object") { + yield* this.traverseObject(jsonData, levelsBack, path, collapseLength); + } else { + yield `${path.slice(-levelsBack).join(" ")} ${String(jsonData)}`; + } + } + + private async *traverseObject( + jsonObject: Record, + levelsBack: number, + path: string[], + collapseLength?: number, + ): AsyncGenerator { + const originalLength = path.length; + for (const [key, value] of Object.entries(jsonObject)) { + path.push(key); + yield* this.depthFirstYield(value, levelsBack, path, collapseLength); + path.length = originalLength; // Reset path length to original. Avoids cloning the path array every time. } } @@ -362,55 +386,8 @@ export class JSONReader extends FileReader { } } - /** - * A generator function that performs a depth-first traversal of the JSON data. - * If the JSON data is an array, it traverses each item in the array. - * If the JSON data is an object, it traverses each key-value pair in the object. - * For each traversed item or value, it performs a depth-first yield. - * - * @param jsonData - The JSON data to traverse. - * @param levelsBack - The number of levels up the JSON structure to include in the output. - * @param path - The current path in the JSON structure. - * @param collapseLength - The maximum length of JSON string representation to be collapsed into a single line. - * @throws {JSONReaderError} - Throws an error if there is an issue during the depth-first traversal of the object. - */ - private async *depthFirstTraversal( - jsonData: JSONValue, - levelsBack: number, - path: string[], - collapseLength?: number, - ): AsyncGenerator { - try { - if (Array.isArray(jsonData)) { - for (const item of jsonData) { - yield* this.depthFirstYield(item, levelsBack, path, collapseLength); - } - } else if (jsonData !== null && typeof jsonData === "object") { - const originalLength = path.length; - for (const [key, value] of Object.entries(jsonData)) { - path.push(key); - if (value !== null) { - yield* this.depthFirstYield( - value, - levelsBack, - path, - collapseLength, - ); - } - path.length = originalLength; // Reset path length to original. Avoids cloning the path array every time. - } - } - } catch (e) { - throw new JSONReaderError( - `Error during depth-first traversal of object: ${ - e instanceof Error ? e.message : "Unknown error occurred" - }`, - { cause: e }, - ); - } - } private convertToAscii(str: string): string { - return str.replace( + return str.replaceAll( /[\u007F-\uFFFF]/g, (char) => `\\u${char.charCodeAt(0).toString(16).padStart(4, "0")}`, ); From 8ddb793923b401ec0b1045a8987040e7a7d9699e Mon Sep 17 00:00:00 2001 From: "F. Wimmer" Date: Tue, 13 Aug 2024 02:36:35 +0000 Subject: [PATCH 14/18] update test --- packages/llamaindex/tests/readers/JSONReader.test.ts | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/packages/llamaindex/tests/readers/JSONReader.test.ts b/packages/llamaindex/tests/readers/JSONReader.test.ts index 253cd9992c..36e537bff0 100644 --- a/packages/llamaindex/tests/readers/JSONReader.test.ts +++ b/packages/llamaindex/tests/readers/JSONReader.test.ts @@ -1,9 +1,4 @@ -import { - JSONParseError, - JSONReader, - JSONReaderError, - type JSONValue, -} from "llamaindex"; +import { JSONParseError, JSONReader, JSONReaderError } from "llamaindex"; import { beforeEach, describe, expect, it } from "vitest"; const content = new TextEncoder().encode( @@ -11,7 +6,7 @@ const content = new TextEncoder().encode( ); describe("JSONReader", () => { - let reader: JSONReader; + let reader: JSONReader; beforeEach(() => { reader = new JSONReader(); @@ -19,12 +14,11 @@ describe("JSONReader", () => { describe("constructor", () => { it("should set default options", () => { - expect(reader["options"]).toEqual({ + expect(reader["options"]).toMatchObject({ streamingThreshold: 50, ensureAscii: false, isJsonLines: false, cleanJson: true, - verbose: false, }); }); From 4ce2b3b6c4e26d0b4978cd6f45e488ad39cb779d Mon Sep 17 00:00:00 2001 From: "F. Wimmer" Date: Tue, 13 Aug 2024 02:37:04 +0000 Subject: [PATCH 15/18] update docs --- apps/docs/docs/modules/data_loaders/json.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/docs/docs/modules/data_loaders/json.md b/apps/docs/docs/modules/data_loaders/json.md index 584851c7bb..6f79951694 100644 --- a/apps/docs/docs/modules/data_loaders/json.md +++ b/apps/docs/docs/modules/data_loaders/json.md @@ -29,7 +29,7 @@ Basic: - `cleanJson?`: Whether to clean the JSON by filtering out structural characters (`{}, [], and ,`). If set to false, it will just parse the JSON, not removing structural characters. Default is `true`. -- `verbose?`: Whether to enable verbose logging. Default is `false` +- `logger?`: A placeholder for a custom logger function. Depth-First-Traversal: From e6ca4fda66062c136e81d06c7901b66d46956397 Mon Sep 17 00:00:00 2001 From: "F. Wimmer" Date: Tue, 13 Aug 2024 02:40:16 +0000 Subject: [PATCH 16/18] changeset --- .changeset/blue-bears-invite.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/blue-bears-invite.md diff --git a/.changeset/blue-bears-invite.md b/.changeset/blue-bears-invite.md new file mode 100644 index 0000000000..9028879a6a --- /dev/null +++ b/.changeset/blue-bears-invite.md @@ -0,0 +1,5 @@ +--- +"llamaindex": patch +--- + +feat: add JSON streaming to JSONReader From bf92f512d56b753f91dd45d5f46042920e550edb Mon Sep 17 00:00:00 2001 From: "F. Wimmer" Date: Tue, 13 Aug 2024 02:51:24 +0000 Subject: [PATCH 17/18] example --- examples/readers/src/json.ts | 26 ++++++++------------------ 1 file changed, 8 insertions(+), 18 deletions(-) diff --git a/examples/readers/src/json.ts b/examples/readers/src/json.ts index fa2577a10f..02ef507dbc 100644 --- a/examples/readers/src/json.ts +++ b/examples/readers/src/json.ts @@ -1,5 +1,4 @@ -// import { JSONReader } from "llamaindex"; -import { JSONReader } from "../../../packages/llamaindex/src/readers/JSONReader.js"; +import { JSONReader } from "llamaindex"; async function main() { // Data @@ -12,48 +11,39 @@ async function main() { const jsonlBuffer = new TextEncoder().encode(jsonlContent); // Default settings - const reader1 = new JSONReader({ streamingThreshold: 0, verbose: true }); + const reader1 = new JSONReader(); const docs1 = await reader1.loadData(file); console.log(docs1[0]); // Unclean JSON - const reader2 = new JSONReader({ cleanJson: false, verbose: true }); + const reader2 = new JSONReader({ cleanJson: false }); const docs2 = await reader2.loadData(file); console.log(docs2[0]); // Depth first yield of JSON structural paths, going back 2 levels - const reader3 = new JSONReader({ levelsBack: 2, verbose: true }); + const reader3 = new JSONReader({ levelsBack: 2 }); const docs3 = await reader3.loadData(file); console.log(docs3[0]); // Depth first yield of all levels - const reader4 = new JSONReader({ levelsBack: 0, verbose: true }); + const reader4 = new JSONReader({ levelsBack: 0 }); const docs4 = await reader4.loadData(file); console.log(docs4[0]); // Depth first yield of all levels, collapse structural paths below length 100 - const reader5 = new JSONReader({ - levelsBack: 0, - collapseLength: 100, - verbose: true, - }); + const reader5 = new JSONReader({ levelsBack: 0, collapseLength: 100 }); const docs5 = await reader5.loadData(file); console.log(docs5[0]); // Convert ASCII to unichode escape sequences - const reader6 = new JSONReader({ ensureAscii: true, verbose: true }); + const reader6 = new JSONReader({ ensureAscii: true }); const docs6 = await reader6.loadDataAsContent(nonAsciiBuffer); console.log(docs6[0]); // JSON Lines Format - const reader7 = new JSONReader({ - isJsonLines: true, - streamingThreshold: 0, - verbose: true, - }); + const reader7 = new JSONReader({ isJsonLines: true }); const docs7 = await reader7.loadDataAsContent(jsonlBuffer); console.log(docs7[0]); - console.log(docs7[1]); } main().catch(console.error); From 20e2d057ca2d7af2a392ba1058003d71d9a22fd1 Mon Sep 17 00:00:00 2001 From: Alex Yang Date: Fri, 6 Sep 2024 10:25:58 -0700 Subject: [PATCH 18/18] Update packages/llamaindex/src/readers/JSONReader.ts --- packages/llamaindex/src/readers/JSONReader.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/llamaindex/src/readers/JSONReader.ts b/packages/llamaindex/src/readers/JSONReader.ts index 1dfd61be97..b1894e2a23 100644 --- a/packages/llamaindex/src/readers/JSONReader.ts +++ b/packages/llamaindex/src/readers/JSONReader.ts @@ -1,7 +1,6 @@ import { parseChunked } from "@discoveryjs/json-ext"; import type { JSONValue } from "@llamaindex/core/global"; import { Document, FileReader } from "@llamaindex/core/schema"; -import { ReadableStream } from "@llamaindex/env"; import { type Logger, consoleLogger } from "../internal/logger.js"; // Possible improvements: