diff --git a/.changeset/blue-bears-invite.md b/.changeset/blue-bears-invite.md new file mode 100644 index 0000000000..9028879a6a --- /dev/null +++ b/.changeset/blue-bears-invite.md @@ -0,0 +1,5 @@ +--- +"llamaindex": patch +--- + +feat: add JSON streaming to JSONReader diff --git a/apps/docs/docs/modules/data_loaders/json.md b/apps/docs/docs/modules/data_loaders/json.md index c12767c05d..6f79951694 100644 --- a/apps/docs/docs/modules/data_loaders/json.md +++ b/apps/docs/docs/modules/data_loaders/json.md @@ -2,6 +2,7 @@ A simple JSON data loader with various options. Either parses the entire string, cleaning it and treat each line as an embedding or performs a recursive depth-first traversal yielding JSON paths. +Supports streaming of large JSON data using [@discoveryjs/json-ext](https://github.com/discoveryjs/json-ext) ## Usage @@ -20,12 +21,16 @@ const docsFromContent = reader.loadDataAsContent(content); Basic: +- `streamingThreshold?`: The threshold for using streaming mode in MB of the JSON Data. CEstimates characters by calculating bytes: `(streamingThreshold * 1024 * 1024) / 2` and comparing against `.length` of the JSON string. Set `undefined` to disable streaming or `0` to always use streaming. Default is `50` MB. + - `ensureAscii?`: Wether to ensure only ASCII characters be present in the output by converting non-ASCII characters to their unicode escape sequence. Default is `false`. -- `isJsonLines?`: Wether the JSON is in JSON Lines format. If true, will split into lines, remove empty one and parse each line as JSON. Default is `false` +- `isJsonLines?`: Wether the JSON is in JSON Lines format. If true, will split into lines, remove empty one and parse each line as JSON. Note: Uses a custom streaming parser, most likely less robust than json-ext. Default is `false` - `cleanJson?`: Whether to clean the JSON by filtering out structural characters (`{}, [], and ,`). If set to false, it will just parse the JSON, not removing structural characters. Default is `true`. +- `logger?`: A placeholder for a custom logger function. + Depth-First-Traversal: - `levelsBack?`: Specifies how many levels up the JSON structure to include in the output. `cleanJson` will be ignored. If set to 0, all levels are included. If undefined, parses the entire JSON, treat each line as an embedding and create a document per top-level array. Default is `undefined` diff --git a/examples/readers/package.json b/examples/readers/package.json index e430321d15..c741b1214f 100644 --- a/examples/readers/package.json +++ b/examples/readers/package.json @@ -14,7 +14,8 @@ "start:assemblyai": "node --import tsx ./src/assemblyai.ts", "start:llamaparse-dir": "node --import tsx ./src/simple-directory-reader-with-llamaparse.ts", "start:llamaparse-json": "node --import tsx ./src/llamaparse-json.ts", - "start:discord": "node --import tsx ./src/discord.ts" + "start:discord": "node --import tsx ./src/discord.ts", + "start:json": "node --import tsx ./src/json.ts" }, "dependencies": { "llamaindex": "*" diff --git a/packages/llamaindex/package.json b/packages/llamaindex/package.json index 6d846b31cb..f253bb86d3 100644 --- a/packages/llamaindex/package.json +++ b/packages/llamaindex/package.json @@ -25,6 +25,7 @@ "@azure/identity": "^4.4.1", "@datastax/astra-db-ts": "^1.4.1", "@discordjs/rest": "^2.3.0", + "@discoveryjs/json-ext": "^0.6.1", "@google-cloud/vertexai": "1.2.0", "@google/generative-ai": "0.12.0", "@grpc/grpc-js": "^1.11.1", diff --git a/packages/llamaindex/src/readers/JSONReader.ts b/packages/llamaindex/src/readers/JSONReader.ts index 2f4f99a9f0..b1894e2a23 100644 --- a/packages/llamaindex/src/readers/JSONReader.ts +++ b/packages/llamaindex/src/readers/JSONReader.ts @@ -1,9 +1,26 @@ +import { parseChunked } from "@discoveryjs/json-ext"; import type { JSONValue } from "@llamaindex/core/global"; import { Document, FileReader } from "@llamaindex/core/schema"; +import { type Logger, consoleLogger } from "../internal/logger.js"; + +// Possible improvements: +// - use `json-ext` for streaming JSON.stringify. Currently once JSON.stringify is called, the data is already chunked, so there should be no high risk of memory issues +// --> json-ext can use `stringifyInfo` to get the minimum byte lengths as well as return any circular references found, could be used to avoid erroring on circular references during stringification + export interface JSONReaderOptions { + /** + * The threshold for using streaming mode. + * Give the approximate size of the JSON data in MB. Estimates character length by calculating: "(streamingThreshold * 1024 * 1024) / 2" and comparing against string.length + * Streaming mode avoids memory issues when parsing large JSON data. Set "undefined" to disable streaming or "0" to always use streaming. + * + * @default 50 MB + */ + streamingThreshold?: number; + /** * Whether to ensure only ASCII characters. * Converts non-ASCII characters to their unicode escape sequence. + * * @default false */ ensureAscii?: boolean; @@ -11,6 +28,8 @@ export interface JSONReaderOptions { /** * Whether the JSON is in JSON Lines format. * Split into lines, remove empty lines, parse each line as JSON. + * Note: Uses a custom streaming parser, most likely less robust than json-ext + * * @default false */ isJsonLines?: boolean; @@ -18,6 +37,7 @@ export interface JSONReaderOptions { /** * Whether to clean the JSON by filtering out structural characters (`{}, [], and ,`). * If set to false, it will just parse the JSON, not removing structural characters. + * * @default true */ cleanJson?: boolean; @@ -25,6 +45,7 @@ export interface JSONReaderOptions { /** * Specifies how many levels up the JSON structure to include in the output. cleanJson will be ignored. * If set to 0, all levels are included. If undefined, parses the entire JSON and treats each line as an embedding. + * * @default undefined */ levelsBack?: number; @@ -32,40 +53,146 @@ export interface JSONReaderOptions { /** * The maximum length of JSON string representation to be collapsed into a single line. * Only applicable when `levelsBack` is set. + * * @default undefined */ collapseLength?: number; + /** + * A placeholder for a custom logging function. + * + * @default consoleLogger + */ + logger?: Logger; } export class JSONReaderError extends Error {} export class JSONParseError extends JSONReaderError {} export class JSONStringifyError extends JSONReaderError {} +class JSONParser { + static async *parse( + content: string | Uint8Array, + isJsonLines: boolean, + isStream: boolean, + logger?: Logger, + ): AsyncGenerator { + logger?.log(`Parsing JSON ${isJsonLines ? "as JSON Lines" : ""}`); + try { + if (isStream) { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(content as Uint8Array); + controller.close(); + }, + }); + yield* this.parseStream(stream, isJsonLines); + } else { + yield* this.parseString(content as string, isJsonLines); + } + } catch (e) { + throw new JSONParseError( + `Failed to parse JSON ${isJsonLines ? "as JSON Lines" : ""} ${isStream ? "while streaming" : ""}: ${ + e instanceof Error ? e.message : "Unknown error occurred" + }`, + { cause: e }, + ); + } + } + + private static async *parseStream( + stream: ReadableStream, + isJsonLines: boolean, + ) { + if (isJsonLines) { + yield* this.parseJsonLinesStream(stream); + } else { + yield await parseChunked(stream); + } + } + + private static async *parseString(jsonStr: string, isJsonLines: boolean) { + if (isJsonLines) { + yield* this.parseJsonLines(jsonStr); + } else { + yield JSON.parse(jsonStr); + } + } + + private static async *parseJsonLines( + jsonStr: string, + ): AsyncGenerator { + for (const line of jsonStr.split("\n").filter((l) => l.trim())) { + yield JSON.parse(line.trim()); + } + } + + private static async *parseJsonLinesStream( + stream: ReadableStream, + ): AsyncGenerator { + const reader = stream.getReader(); + const decoder = new TextDecoder("utf-8"); + let buffer = ""; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + const lines = buffer.split("\n"); + buffer = lines.pop() || ""; + + for (const line of lines) { + if (line.trim()) { + yield JSON.parse(line.trim()); + } + } + } + + if (buffer.trim()) { + yield JSON.parse(buffer.trim()); + } + } +} + /** * A reader that reads JSON data and returns an array of Document objects. * Supports various options to modify the output. */ -export class JSONReader extends FileReader { +export class JSONReader extends FileReader { private options: JSONReaderOptions; constructor(options: JSONReaderOptions = {}) { super(); - this.options = { + this.options = this.normalizeOptions(options); + } + + // Initialize options with validation + private normalizeOptions( + providedOptions: JSONReaderOptions, + ): JSONReaderOptions { + const defaultOptions: JSONReaderOptions = { + streamingThreshold: 50, ensureAscii: false, isJsonLines: false, cleanJson: true, - ...options, + logger: consoleLogger, }; - this.validateOptions(); - } - private validateOptions(): void { - const { levelsBack, collapseLength } = this.options; - if (levelsBack !== undefined && levelsBack < 0) { - throw new JSONReaderError("levelsBack must not be negative"); - } - if (collapseLength !== undefined && collapseLength < 0) { - throw new JSONReaderError("collapseLength must not be negative"); + + const options = { ...defaultOptions, ...providedOptions }; + + // Validation logic for numeric options + const numericOptions: (keyof JSONReaderOptions)[] = [ + "streamingThreshold", + "collapseLength", + "levelsBack", + ]; + for (const key of numericOptions) { + if (typeof options[key] === "number" && options[key]! < 0) { + throw new JSONReaderError(`${key} must not be negative`); + } } + + return options; } /** @@ -74,63 +201,47 @@ export class JSONReader extends FileReader { * @param {Uint8Array} content - The JSON data as a Uint8Array. * @return {Promise} A Promise that resolves to an array of Document objects. */ - async loadDataAsContent(content: Uint8Array): Promise { - const jsonStr = new TextDecoder("utf-8").decode(content); - const parser = this.parseJsonString(jsonStr); + public async loadDataAsContent(content: Uint8Array): Promise { const documents: Document[] = []; - for await (const data of parser) { - documents.push(await this.createDocument(data)); + for await (const document of this.parseJson(content)) { + documents.push(document); } + return documents; } - private async *parseJsonString(jsonStr: string): AsyncGenerator { - if (this.options.isJsonLines) { - yield* this.parseJsonLines(jsonStr); - } else { - yield* this.parseJson(jsonStr); - } - } + private async *parseJson(content: Uint8Array): AsyncGenerator { + const jsonStr = new TextDecoder("utf-8").decode(content); + const limit = + ((this.options.streamingThreshold ?? Infinity) * 1024 * 1024) / 2; + const shouldStream = jsonStr.length > limit; - private async *parseJsonLines(jsonStr: string): AsyncGenerator { - // Process each line as a separate JSON object for JSON Lines format - for (const line of jsonStr.split("\n")) { - if (line.trim() !== "") { - try { - yield JSON.parse(line.trim()); - } catch (e) { - throw new JSONParseError( - `Error parsing JSON Line: ${e} in "${line.trim()}"`, - ); - } - } - } - } + const parsedData = JSONParser.parse( + shouldStream ? content : jsonStr, + this.options.isJsonLines ?? false, + shouldStream ? true : false, + this.options.logger, + ); - private async *parseJson(jsonStr: string): AsyncGenerator { - try { - // TODO: Add streaming to handle large JSON files - const parsedData = JSON.parse(jsonStr); - - if (!this.options.cleanJson) { - // Yield the parsed data directly if cleanJson is false - yield parsedData; - } else if (Array.isArray(parsedData)) { - // Check if it's an Array, if so yield each item seperately, i.e. create a document per top-level array of the json - for (const item of parsedData) { - yield item; - } + this.options.logger?.log( + `Using ${shouldStream ? "streaming parser" : "JSON.parse"} as string length ${shouldStream ? "exceeds" : "is less than"} calculated character limit: "${limit}"`, + ); + + for await (const value of parsedData) { + // Yield the parsed data directly if cleanJson is false or the value is not an array. + if (!this.options.cleanJson || !Array.isArray(value)) { + yield await this.createDocument(value as JSONValue); } else { - // If not an array, just yield the parsed data - yield parsedData; + // If it's an Array, yield each item seperately, i.e. create a document per top-level array of the json + for (const item of value) { + yield await this.createDocument(item as JSONValue); + } } - } catch (e) { - throw new JSONParseError(`Error parsing JSON: ${e} in "${jsonStr}"`); } } - private async createDocument(data: T): Promise { + private async createDocument(data: JSONValue): Promise { const docText: string = this.options.levelsBack === undefined ? this.formatJsonString(data) @@ -148,24 +259,10 @@ export class JSONReader extends FileReader { }); } - private async prepareDepthFirstYield(data: T): Promise { - const levelsBack = this.options.levelsBack ?? 0; - const results: string[] = []; - for await (const value of this.depthFirstYield( - data, - levelsBack === 0 ? Infinity : levelsBack, - [], - this.options.collapseLength, - )) { - results.push(value); - } - return results.join("\n"); - } - // Note: JSON.stringify does not differentiate between indent "undefined/null"(= no whitespaces) and "0"(= no whitespaces, but linebreaks) // as python json.dumps does. Thats why we use indent 1 and remove the leading spaces. - private formatJsonString(data: T): string { + private formatJsonString(data: JSONValue): string { try { const jsonStr = JSON.stringify( data, @@ -173,70 +270,102 @@ export class JSONReader extends FileReader { this.options.cleanJson ? 1 : 0, ); if (this.options.cleanJson) { - // Clean JSON by removing structural characters and unnecessary whitespace return jsonStr .split("\n") .filter((line) => !/^[{}\[\],]*$/.test(line.trim())) - .map((line) => line.trimStart()) // Removes the indent + .map((line) => line.trimStart()) .join("\n"); } return jsonStr; } catch (e) { throw new JSONStringifyError( - `Error stringifying JSON: ${e} in "${JSON.stringify(data)}"`, + `Error stringifying JSON data: ${ + e instanceof Error ? e.message : "Unknown error occurred" + }`, + { cause: e }, ); } } + private async prepareDepthFirstYield(data: JSONValue): Promise { + const levelsBack = this.options.levelsBack ?? 0; + const results: string[] = []; + + for await (const value of this.depthFirstYield( + data, + levelsBack === 0 ? Infinity : levelsBack, + [], + this.options.collapseLength, + )) { + results.push(value); + } + return results.join("\n"); + } + /** * A generator function that determines the next step in traversing the JSON data. - * If the serialized JSON string is not null, it yields the string and returns. - * If the JSON data is an object, it delegates the traversal to the depthFirstTraversal method. - * Otherwise, it yields the JSON data as a string. - * - * @param jsonData - The JSON data to traverse. - * @param levelsBack - The number of levels up the JSON structure to include in the output. - * @param path - The current path in the JSON structure. - * @param collapseLength - The maximum length of JSON string representation to be collapsed into a single line. - * @throws {JSONReaderError} - Throws an error if there is an issue during the depth-first traversal. + * If collapseLength is set and the serialized JSON string is not null, it yields the collapsed string. + * Otherwise it delegates traversal to the `traverseJsonData` function. */ private async *depthFirstYield( - jsonData: T, + jsonData: JSONValue, levelsBack: number, path: string[], collapseLength?: number, ): AsyncGenerator { - try { - const jsonStr = this.serializeAndCollapse( - jsonData, - levelsBack, - path, - collapseLength, - ); - if (jsonStr !== null) { - yield jsonStr; - return; - } + const jsonStr = this.serializeAndCollapse( + jsonData, + levelsBack, + path, + collapseLength, + ); + if (jsonStr !== null) { + yield jsonStr; + } else { + yield* this.traverseJsonData(jsonData, levelsBack, path, collapseLength); + } + } - if (jsonData !== null && typeof jsonData === "object") { - yield* this.depthFirstTraversal( - jsonData, - levelsBack, - path, - collapseLength, - ); - } else { - yield `${path.slice(-levelsBack).join(" ")} ${String(jsonData)}`; + /** + * Traverse the JSON data. + * If the data is an array, it traverses each item in the array. + * If the data is an object, it delegates traversal to the `traverseObject` function. + * If the data is a primitive value, it yields the value with the path. + * + */ + private async *traverseJsonData( + jsonData: JSONValue, + levelsBack: number, + path: string[], + collapseLength?: number, + ): AsyncGenerator { + if (Array.isArray(jsonData)) { + for (const item of jsonData) { + yield* this.depthFirstYield(item, levelsBack, path, collapseLength); } - } catch (e) { - throw new JSONReaderError( - `Error during depth first traversal at path ${path.join(" ")}: ${e}`, - ); + } else if (jsonData !== null && typeof jsonData === "object") { + yield* this.traverseObject(jsonData, levelsBack, path, collapseLength); + } else { + yield `${path.slice(-levelsBack).join(" ")} ${String(jsonData)}`; + } + } + + private async *traverseObject( + jsonObject: Record, + levelsBack: number, + path: string[], + collapseLength?: number, + ): AsyncGenerator { + const originalLength = path.length; + for (const [key, value] of Object.entries(jsonObject)) { + path.push(key); + yield* this.depthFirstYield(value, levelsBack, path, collapseLength); + path.length = originalLength; // Reset path length to original. Avoids cloning the path array every time. } } private serializeAndCollapse( - jsonData: T, + jsonData: JSONValue, levelsBack: number, path: string[], collapseLength?: number, @@ -247,56 +376,17 @@ export class JSONReader extends FileReader { ? `${path.slice(-levelsBack).join(" ")} ${jsonStr}` : null; } catch (e) { - throw new JSONStringifyError(`Error stringifying JSON data: ${e}`); - } - } - /** - * A generator function that performs a depth-first traversal of the JSON data. - * If the JSON data is an array, it traverses each item in the array. - * If the JSON data is an object, it traverses each key-value pair in the object. - * For each traversed item or value, it performs a depth-first yield. - * - * @param jsonData - The JSON data to traverse. - * @param levelsBack - The number of levels up the JSON structure to include in the output. - * @param path - The current path in the JSON structure. - * @param collapseLength - The maximum length of JSON string representation to be collapsed into a single line. - * @throws {JSONReaderError} - Throws an error if there is an issue during the depth-first traversal of the object. - */ - private async *depthFirstTraversal( - jsonData: T, - levelsBack: number, - path: string[], - collapseLength?: number, - ): AsyncGenerator { - try { - if (Array.isArray(jsonData)) { - for (const item of jsonData) { - yield* this.depthFirstYield(item, levelsBack, path, collapseLength); - } - } else if (jsonData !== null && typeof jsonData === "object") { - const originalLength = path.length; - for (const [key, value] of Object.entries(jsonData)) { - path.push(key); - if (value !== null) { - yield* this.depthFirstYield( - value as T, - levelsBack, - path, - collapseLength, - ); - } - path.length = originalLength; // Reset path length to original. Avoids cloning the path array every time. - } - } - } catch (e) { - throw new JSONReaderError( - `Error during depth-first traversal of object: ${e}`, + throw new JSONStringifyError( + `Error stringifying JSON data: ${ + e instanceof Error ? e.message : "Unknown error occurred" + }`, + { cause: e }, ); } } private convertToAscii(str: string): string { - return str.replace( + return str.replaceAll( /[\u007F-\uFFFF]/g, (char) => `\\u${char.charCodeAt(0).toString(16).padStart(4, "0")}`, ); diff --git a/packages/llamaindex/tests/readers/JSONReader.test.ts b/packages/llamaindex/tests/readers/JSONReader.test.ts index 7673baef63..36e537bff0 100644 --- a/packages/llamaindex/tests/readers/JSONReader.test.ts +++ b/packages/llamaindex/tests/readers/JSONReader.test.ts @@ -1,9 +1,4 @@ -import { - JSONParseError, - JSONReader, - JSONReaderError, - type JSONValue, -} from "llamaindex"; +import { JSONParseError, JSONReader, JSONReaderError } from "llamaindex"; import { beforeEach, describe, expect, it } from "vitest"; const content = new TextEncoder().encode( @@ -11,7 +6,7 @@ const content = new TextEncoder().encode( ); describe("JSONReader", () => { - let reader: JSONReader; + let reader: JSONReader; beforeEach(() => { reader = new JSONReader(); @@ -19,7 +14,8 @@ describe("JSONReader", () => { describe("constructor", () => { it("should set default options", () => { - expect(reader["options"]).toEqual({ + expect(reader["options"]).toMatchObject({ + streamingThreshold: 50, ensureAscii: false, isJsonLines: false, cleanJson: true, diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 0dc87238ce..8087ea0b3b 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -481,6 +481,9 @@ importers: '@discordjs/rest': specifier: ^2.3.0 version: 2.3.0 + '@discoveryjs/json-ext': + specifier: ^0.6.1 + version: 0.6.1 '@google-cloud/vertexai': specifier: 1.2.0 version: 1.2.0(encoding@0.1.13) @@ -2047,6 +2050,10 @@ packages: resolution: {integrity: sha512-dBVuXR082gk3jsFp7Rd/JI4kytwGHecnCoTtXFb7DB6CNHp4rg5k1bhg0nWdLGLnOV71lmDzGQaLMy8iPLY0pw==} engines: {node: '>=10.0.0'} + '@discoveryjs/json-ext@0.6.1': + resolution: {integrity: sha512-boghen8F0Q8D+0/Q1/1r6DUEieUJ8w2a1gIknExMSHBsJFOr2+0KUfHiVYBvucPwl3+RU5PFBK833FjFCh3BhA==} + engines: {node: '>=14.17.0'} + '@docsearch/css@3.6.1': resolution: {integrity: sha512-VtVb5DS+0hRIprU2CO6ZQjK2Zg4QU5HrDM1+ix6rT0umsYvFvatMAnf97NHZlVWDaaLlx7GRfR/7FikANiM2Fg==} @@ -13048,6 +13055,8 @@ snapshots: '@discoveryjs/json-ext@0.5.7': {} + '@discoveryjs/json-ext@0.6.1': {} + '@docsearch/css@3.6.1': {} '@docsearch/react@3.6.1(@algolia/client-search@5.2.3)(@types/react@18.3.5)(react-dom@18.3.1(react@18.3.1))(react@18.3.1)(search-insights@2.17.0)':