From 99a85d47808958403f0519db6b62cbd1a179c2b3 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Fri, 18 Mar 2022 16:12:12 +0200 Subject: [PATCH] introduce experimental batched streaming (#162) --- .changeset/giant-llamas-peel.md | 2 + .changeset/thin-maps-boil.md | 11 ++ src/execution/__tests__/stream-test.ts | 241 ++++++++++++++++++++++--- src/execution/executor.ts | 139 ++++++++++++-- src/type/directives.ts | 10 + 5 files changed, 365 insertions(+), 38 deletions(-) create mode 100644 .changeset/thin-maps-boil.md diff --git a/.changeset/giant-llamas-peel.md b/.changeset/giant-llamas-peel.md index 928fcfd21d..ff25dbb009 100644 --- a/.changeset/giant-llamas-peel.md +++ b/.changeset/giant-llamas-peel.md @@ -5,3 +5,5 @@ introduce experimental parallel streaming Experimental `inParallel` boolean argument to the stream directive may now be used to stream list items as they are ready instead of in sequential list order. + +When parallel streaming is enabled, the `data` property of execution patch results will consist of an array of items and a new `atIndices` property will contain the corresponding indices of the items. diff --git a/.changeset/thin-maps-boil.md b/.changeset/thin-maps-boil.md new file mode 100644 index 0000000000..4e7ba48ad0 --- /dev/null +++ b/.changeset/thin-maps-boil.md @@ -0,0 +1,11 @@ +--- +'graphql-executor': patch +--- + +introduce experimental batched streaming + +Experimental `maxChunkSize` and `maxInterval` arguments allows for increasing the number of items in each streamed payload up to the specified maximum size. A maximum interval (specified in milliseconds) can be used to send any ready items prior to the maximum chunk size. + +When using a `maxChunkSize` greater than 1, the `data` property of execution patch results will consist of an array of items and a new `atIndex` property will contain the initial index for the items included within the chunk. + +These options can be combined with parallel streaming. When streaming in parallel, the `data` property will always consist on an array of items and the `atIndices` property will always consist of an array of the matching indices, even when `maxChunkSize` is equal to 1. If these new arguments prove popular, `data` should probably be an array even when `maxChunkSize` is equal to one, even without parallel streaming. diff --git a/src/execution/__tests__/stream-test.ts b/src/execution/__tests__/stream-test.ts index 849f710d49..eaeec6c59b 100644 --- a/src/execution/__tests__/stream-test.ts +++ b/src/execution/__tests__/stream-test.ts @@ -266,6 +266,31 @@ describe('Execute: stream directive', () => { }, ]); }); + it('Can stream a list field in chunks of size greater than 1', async () => { + const document = parse('{ scalarList @stream(maxChunkSize: 2) }'); + const result = await complete(document); + + expect(result).to.deep.equal([ + { + data: { + scalarList: [], + }, + hasNext: true, + }, + { + data: ['apple', 'banana'], + path: ['scalarList'], + atIndex: 0, + hasNext: true, + }, + { + data: ['coconut'], + path: ['scalarList'], + atIndex: 2, + hasNext: false, + }, + ]); + }); it('Can use default value of initialCount', async () => { const document = parse('{ scalarList @stream }'); const result = await complete(document); @@ -311,7 +336,52 @@ describe('Execute: stream directive', () => { expectJSON(result).toDeepEqual({ errors: [ { - message: 'initialCount must be a positive integer', + message: + 'initialCount must be an integer greater than or equal to zero', + locations: [ + { + line: 1, + column: 3, + }, + ], + path: ['scalarList'], + }, + ], + data: { + scalarList: null, + }, + }); + }); + it('maxChunkSize values less than one throw field errors', async () => { + const document = parse('{ scalarList @stream(maxChunkSize: 0) }'); + const result = await complete(document); + expectJSON(result).toDeepEqual({ + errors: [ + { + message: + 'maxChunkSize must be an integer greater than or equal to one', + locations: [ + { + line: 1, + column: 3, + }, + ], + path: ['scalarList'], + }, + ], + data: { + scalarList: null, + }, + }); + }); + it('maxInterval values less than zero throw field errors', async () => { + const document = parse('{ scalarList @stream(maxInterval: -1) }'); + const result = await complete(document); + expectJSON(result).toDeepEqual({ + errors: [ + { + message: + 'maxInterval must be an integer greater than or equal to zero', locations: [ { line: 1, @@ -460,27 +530,36 @@ describe('Execute: stream directive', () => { hasNext: true, }, { - data: { - name: 'Han', - id: '2', - }, - path: ['asyncSlowList', 1], + data: [ + { + name: 'Han', + id: '2', + }, + ], + path: ['asyncSlowList'], + atIndices: [1], hasNext: true, }, { - data: { - name: 'Leia', - id: '3', - }, - path: ['asyncSlowList', 2], + data: [ + { + name: 'Leia', + id: '3', + }, + ], + path: ['asyncSlowList'], + atIndices: [2], hasNext: true, }, { - data: { - name: 'Luke', - id: '1', - }, - path: ['asyncSlowList', 0], + data: [ + { + name: 'Luke', + id: '1', + }, + ], + path: ['asyncSlowList'], + atIndices: [0], hasNext: false, }, ]); @@ -622,6 +701,51 @@ describe('Execute: stream directive', () => { }, ]); }); + it('Can stream a field that returns an async iterable in chunks of size greater than 1', async () => { + const document = parse(` + query { + asyncIterableList @stream(maxChunkSize: 2) { + name + id + } + } + `); + const result = await complete(document); + expect(result).to.deep.equal([ + { + data: { + asyncIterableList: [], + }, + hasNext: true, + }, + { + data: [ + { + name: 'Luke', + id: '1', + }, + { + name: 'Han', + id: '2', + }, + ], + path: ['asyncIterableList'], + atIndex: 0, + hasNext: true, + }, + { + data: [ + { + name: 'Leia', + id: '3', + }, + ], + path: ['asyncIterableList'], + atIndex: 2, + hasNext: false, + }, + ]); + }); it('Can stream a field that returns an async iterable, using a non-zero initialCount', async () => { const document = parse(` query { @@ -700,7 +824,8 @@ describe('Execute: stream directive', () => { expectJSON(result).toDeepEqual({ errors: [ { - message: 'initialCount must be a positive integer', + message: + 'initialCount must be an integer greater than or equal to zero', locations: [ { line: 3, @@ -930,10 +1055,10 @@ describe('Execute: stream directive', () => { }, ]); }); - it('Handles null returned in non-null async iterable list items after initialCount is reached with parallel streaming', async () => { + it('Handles null returned in non-null async iterable list items after initialCount is reached with maxChunkSize greater than 1', async () => { const document = parse(` query { - asyncIterableNonNullError @stream(initialCount: 0, inParallel: true) { + asyncIterableNonNullError @stream(initialCount: 0, maxChunkSize: 2) { name } } @@ -947,15 +1072,19 @@ describe('Execute: stream directive', () => { hasNext: true, }, { - data: { - name: 'Luke', - }, - path: ['asyncIterableNonNullError', 0], + data: [ + { + name: 'Luke', + }, + ], + path: ['asyncIterableNonNullError'], + atIndex: 0, hasNext: true, }, { data: null, - path: ['asyncIterableNonNullError', 1], + path: ['asyncIterableNonNullError'], + atIndex: 1, errors: [ { message: @@ -971,11 +1100,71 @@ describe('Execute: stream directive', () => { ], hasNext: true, }, + { + data: [ + { + name: 'Han', + }, + ], + path: ['asyncIterableNonNullError'], + atIndex: 2, + hasNext: false, + }, + ]); + }); + it('Handles null returned in non-null async iterable list items after initialCount is reached with parallel streaming', async () => { + const document = parse(` + query { + asyncIterableNonNullError @stream(initialCount: 0, inParallel: true) { + name + } + } + `); + const result = await complete(document); + expectJSON(result).toDeepEqual([ { data: { - name: 'Han', + asyncIterableNonNullError: [], }, - path: ['asyncIterableNonNullError', 2], + hasNext: true, + }, + { + data: [ + { + name: 'Luke', + }, + ], + path: ['asyncIterableNonNullError'], + atIndices: [0], + hasNext: true, + }, + { + data: null, + path: ['asyncIterableNonNullError'], + atIndices: [1], + errors: [ + { + message: + 'Cannot return null for non-nullable field Query.asyncIterableNonNullError.', + locations: [ + { + line: 3, + column: 9, + }, + ], + path: ['asyncIterableNonNullError', 1], + }, + ], + hasNext: true, + }, + { + data: [ + { + name: 'Han', + }, + ], + path: ['asyncIterableNonNullError'], + atIndices: [2], hasNext: false, }, ]); diff --git a/src/execution/executor.ts b/src/execution/executor.ts index 19110123c9..10aa290109 100644 --- a/src/execution/executor.ts +++ b/src/execution/executor.ts @@ -177,6 +177,8 @@ interface IncrementalResult { responseContext: SubsequentResponseContext; data: unknown; path: Path | undefined; + atIndex?: number; + atIndices?: ReadonlyArray; label: string | undefined; } @@ -243,6 +245,8 @@ export interface ExecutionPatchResult< errors?: ReadonlyArray; data?: TData | null; path?: ReadonlyArray; + atIndex?: number; + atIndices?: ReadonlyArray; label?: string; hasNext: boolean; extensions?: TExtensions; @@ -293,6 +297,8 @@ export type DeferValuesGetter = ( export interface StreamValues { initialCount: number; + maxChunkSize: number; + maxInterval: Maybe; inParallel: boolean; label?: string; } @@ -694,7 +700,8 @@ export class Executor { ): Publisher { return new Publisher({ payloadFromSource: (result, hasNext) => { - const { responseContext, data, path, label } = result; + const { responseContext, data, path, atIndex, atIndices, label } = + result; const errors = []; for (const responseNode of responseContext.responseNodes) { errors.push(...responseNode.errors); @@ -706,6 +713,12 @@ export class Executor { hasNext, }; + if (atIndex != null) { + value.atIndex = atIndex; + } else if (atIndices != null) { + value.atIndices = atIndices; + } + if (label != null) { value.label = label; } @@ -1388,17 +1401,45 @@ export class Executor { return; } - const { initialCount, inParallel, label } = stream; + const { initialCount, maxChunkSize, maxInterval, inParallel, label } = + stream; invariant( typeof initialCount === 'number', 'initialCount must be a number', ); - invariant(initialCount >= 0, 'initialCount must be a positive integer'); + invariant( + initialCount >= 0, + 'initialCount must be an integer greater than or equal to zero', + ); + + invariant( + typeof maxChunkSize === 'number', + 'maxChunkSize must be a number', + ); + + invariant( + maxChunkSize >= 1, + 'maxChunkSize must be an integer greater than or equal to one', + ); + + if (maxInterval != null) { + invariant( + typeof maxInterval === 'number', + 'maxInterval must be a number', + ); + + invariant( + maxInterval >= 0, + 'maxInterval must be an integer greater than or equal to zero', + ); + } return { initialCount, + maxChunkSize, + maxInterval, inParallel: inParallel === true, label: typeof label === 'string' ? label : undefined, }; @@ -1456,6 +1497,8 @@ export class Executor { createStreamContext( exeContext: ExecutionContext, initialCount: number, + maxChunkSize: number, + maxInterval: Maybe, inParallel: boolean, path: Path, label: string | undefined, @@ -1472,8 +1515,8 @@ export class Executor { ParallelStreamResponseContext >({ initialIndex: initialCount, - maxBundleSize: 1, - maxInterval: undefined, + maxBundleSize: maxChunkSize, + maxInterval, createDataBundleContext: () => { exeContext.state.pendingPushes++; return { @@ -1507,8 +1550,9 @@ export class Executor { context.responseNodes, { responseContext: context, - data: context.results[0], - path: addPath(path, context.atIndices[0], undefined), + data: context.results, + path, + atIndices: context.atIndices, label, }, parentResponseNode, @@ -1520,13 +1564,80 @@ export class Executor { { responseContext: context, data: null, - path: addPath(path, context.atIndices[0], undefined), + path, + atIndices: context.atIndices, label, }, parentResponseNode, ); }, }) + : maxChunkSize > 1 + ? getSequentialBundler( + initialCount, + new Bundler< + StreamDataResult, + ResponseNode, + SequentialStreamDataBundleContext, + SequentialStreamResponseContext + >({ + initialIndex: initialCount, + maxBundleSize: maxChunkSize, + maxInterval, + createDataBundleContext: (count) => { + exeContext.state.pendingPushes++; + return { + responseNodes: [], + parentResponseNode, + atIndex: count, + results: [], + }; + }, + createErrorBundleContext: (count) => { + exeContext.state.pendingPushes++; + return { + responseNodes: [], + parentResponseNode, + atIndex: count, + }; + }, + onData: (_index, result, context) => { + exeContext.state.pendingStreamResults--; + context.responseNodes.push(result.responseNode); + context.results.push(result.data); + }, + onError: (_index, responseNode, context) => { + exeContext.state.pendingStreamResults--; + context.responseNodes.push(responseNode); + }, + onDataBundle: (context) => { + exeContext.publisher.queue( + context.responseNodes, + { + responseContext: context, + data: context.results, + path, + atIndex: context.atIndex, + label, + }, + parentResponseNode, + ); + }, + onErrorBundle: (context) => { + exeContext.publisher.queue( + context.responseNodes, + { + responseContext: context, + data: null, + path, + atIndex: context.atIndex, + label, + }, + parentResponseNode, + ); + }, + }), + ) : getSequentialBundler( initialCount, new Bundler< @@ -1536,8 +1647,8 @@ export class Executor { SequentialStreamResponseContext >({ initialIndex: initialCount, - maxBundleSize: 1, - maxInterval: undefined, + maxBundleSize: maxChunkSize, + maxInterval, createDataBundleContext: (count) => { exeContext.state.pendingPushes++; return { @@ -1615,10 +1726,12 @@ export class Executor { let index = _index; while (true) { if (index >= initialCount) { - const { inParallel, label } = stream; + const { maxChunkSize, maxInterval, inParallel, label } = stream; const streamContext = this.createStreamContext( exeContext, initialCount, + maxChunkSize, + maxInterval, inParallel, path, label, @@ -1777,10 +1890,12 @@ export class Executor { try { while (true) { if (index >= initialCount) { - const { inParallel, label } = stream; + const { maxChunkSize, maxInterval, inParallel, label } = stream; const streamContext = this.createStreamContext( exeContext, initialCount, + maxChunkSize, + maxInterval, inParallel, path, label, diff --git a/src/type/directives.ts b/src/type/directives.ts index a7bcc5ef8a..cfe91af136 100644 --- a/src/type/directives.ts +++ b/src/type/directives.ts @@ -51,6 +51,16 @@ export const GraphQLStreamDirective = new GraphQLDirective({ type: GraphQLInt, description: 'Number of items to return immediately', }, + maxChunkSize: { + defaultValue: 1, + type: GraphQLInt, + description: 'Maximum number of items to return within each payload', + }, + maxInterval: { + type: GraphQLInt, + description: + 'Maximum time in ms to wait to collect items for each payload, will wait indefinitely if undefined', + }, inParallel: { defaultValue: false, type: GraphQLBoolean,