Skip to content

Commit

Permalink
introduce experimental batched streaming (#162)
Browse files Browse the repository at this point in the history
  • Loading branch information
yaacovCR authored Mar 18, 2022
1 parent 6b06772 commit 99a85d4
Show file tree
Hide file tree
Showing 5 changed files with 365 additions and 38 deletions.
2 changes: 2 additions & 0 deletions .changeset/giant-llamas-peel.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@
introduce experimental parallel streaming

Experimental `inParallel` boolean argument to the stream directive may now be used to stream list items as they are ready instead of in sequential list order.

When parallel streaming is enabled, the `data` property of execution patch results will consist of an array of items and a new `atIndices` property will contain the corresponding indices of the items.
11 changes: 11 additions & 0 deletions .changeset/thin-maps-boil.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
'graphql-executor': patch
---

introduce experimental batched streaming

Experimental `maxChunkSize` and `maxInterval` arguments allows for increasing the number of items in each streamed payload up to the specified maximum size. A maximum interval (specified in milliseconds) can be used to send any ready items prior to the maximum chunk size.

When using a `maxChunkSize` greater than 1, the `data` property of execution patch results will consist of an array of items and a new `atIndex` property will contain the initial index for the items included within the chunk.

These options can be combined with parallel streaming. When streaming in parallel, the `data` property will always consist on an array of items and the `atIndices` property will always consist of an array of the matching indices, even when `maxChunkSize` is equal to 1. If these new arguments prove popular, `data` should probably be an array even when `maxChunkSize` is equal to one, even without parallel streaming.
241 changes: 215 additions & 26 deletions src/execution/__tests__/stream-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,31 @@ describe('Execute: stream directive', () => {
},
]);
});
it('Can stream a list field in chunks of size greater than 1', async () => {
const document = parse('{ scalarList @stream(maxChunkSize: 2) }');
const result = await complete(document);

expect(result).to.deep.equal([
{
data: {
scalarList: [],
},
hasNext: true,
},
{
data: ['apple', 'banana'],
path: ['scalarList'],
atIndex: 0,
hasNext: true,
},
{
data: ['coconut'],
path: ['scalarList'],
atIndex: 2,
hasNext: false,
},
]);
});
it('Can use default value of initialCount', async () => {
const document = parse('{ scalarList @stream }');
const result = await complete(document);
Expand Down Expand Up @@ -311,7 +336,52 @@ describe('Execute: stream directive', () => {
expectJSON(result).toDeepEqual({
errors: [
{
message: 'initialCount must be a positive integer',
message:
'initialCount must be an integer greater than or equal to zero',
locations: [
{
line: 1,
column: 3,
},
],
path: ['scalarList'],
},
],
data: {
scalarList: null,
},
});
});
it('maxChunkSize values less than one throw field errors', async () => {
const document = parse('{ scalarList @stream(maxChunkSize: 0) }');
const result = await complete(document);
expectJSON(result).toDeepEqual({
errors: [
{
message:
'maxChunkSize must be an integer greater than or equal to one',
locations: [
{
line: 1,
column: 3,
},
],
path: ['scalarList'],
},
],
data: {
scalarList: null,
},
});
});
it('maxInterval values less than zero throw field errors', async () => {
const document = parse('{ scalarList @stream(maxInterval: -1) }');
const result = await complete(document);
expectJSON(result).toDeepEqual({
errors: [
{
message:
'maxInterval must be an integer greater than or equal to zero',
locations: [
{
line: 1,
Expand Down Expand Up @@ -460,27 +530,36 @@ describe('Execute: stream directive', () => {
hasNext: true,
},
{
data: {
name: 'Han',
id: '2',
},
path: ['asyncSlowList', 1],
data: [
{
name: 'Han',
id: '2',
},
],
path: ['asyncSlowList'],
atIndices: [1],
hasNext: true,
},
{
data: {
name: 'Leia',
id: '3',
},
path: ['asyncSlowList', 2],
data: [
{
name: 'Leia',
id: '3',
},
],
path: ['asyncSlowList'],
atIndices: [2],
hasNext: true,
},
{
data: {
name: 'Luke',
id: '1',
},
path: ['asyncSlowList', 0],
data: [
{
name: 'Luke',
id: '1',
},
],
path: ['asyncSlowList'],
atIndices: [0],
hasNext: false,
},
]);
Expand Down Expand Up @@ -622,6 +701,51 @@ describe('Execute: stream directive', () => {
},
]);
});
it('Can stream a field that returns an async iterable in chunks of size greater than 1', async () => {
const document = parse(`
query {
asyncIterableList @stream(maxChunkSize: 2) {
name
id
}
}
`);
const result = await complete(document);
expect(result).to.deep.equal([
{
data: {
asyncIterableList: [],
},
hasNext: true,
},
{
data: [
{
name: 'Luke',
id: '1',
},
{
name: 'Han',
id: '2',
},
],
path: ['asyncIterableList'],
atIndex: 0,
hasNext: true,
},
{
data: [
{
name: 'Leia',
id: '3',
},
],
path: ['asyncIterableList'],
atIndex: 2,
hasNext: false,
},
]);
});
it('Can stream a field that returns an async iterable, using a non-zero initialCount', async () => {
const document = parse(`
query {
Expand Down Expand Up @@ -700,7 +824,8 @@ describe('Execute: stream directive', () => {
expectJSON(result).toDeepEqual({
errors: [
{
message: 'initialCount must be a positive integer',
message:
'initialCount must be an integer greater than or equal to zero',
locations: [
{
line: 3,
Expand Down Expand Up @@ -930,10 +1055,10 @@ describe('Execute: stream directive', () => {
},
]);
});
it('Handles null returned in non-null async iterable list items after initialCount is reached with parallel streaming', async () => {
it('Handles null returned in non-null async iterable list items after initialCount is reached with maxChunkSize greater than 1', async () => {
const document = parse(`
query {
asyncIterableNonNullError @stream(initialCount: 0, inParallel: true) {
asyncIterableNonNullError @stream(initialCount: 0, maxChunkSize: 2) {
name
}
}
Expand All @@ -947,15 +1072,19 @@ describe('Execute: stream directive', () => {
hasNext: true,
},
{
data: {
name: 'Luke',
},
path: ['asyncIterableNonNullError', 0],
data: [
{
name: 'Luke',
},
],
path: ['asyncIterableNonNullError'],
atIndex: 0,
hasNext: true,
},
{
data: null,
path: ['asyncIterableNonNullError', 1],
path: ['asyncIterableNonNullError'],
atIndex: 1,
errors: [
{
message:
Expand All @@ -971,11 +1100,71 @@ describe('Execute: stream directive', () => {
],
hasNext: true,
},
{
data: [
{
name: 'Han',
},
],
path: ['asyncIterableNonNullError'],
atIndex: 2,
hasNext: false,
},
]);
});
it('Handles null returned in non-null async iterable list items after initialCount is reached with parallel streaming', async () => {
const document = parse(`
query {
asyncIterableNonNullError @stream(initialCount: 0, inParallel: true) {
name
}
}
`);
const result = await complete(document);
expectJSON(result).toDeepEqual([
{
data: {
name: 'Han',
asyncIterableNonNullError: [],
},
path: ['asyncIterableNonNullError', 2],
hasNext: true,
},
{
data: [
{
name: 'Luke',
},
],
path: ['asyncIterableNonNullError'],
atIndices: [0],
hasNext: true,
},
{
data: null,
path: ['asyncIterableNonNullError'],
atIndices: [1],
errors: [
{
message:
'Cannot return null for non-nullable field Query.asyncIterableNonNullError.',
locations: [
{
line: 3,
column: 9,
},
],
path: ['asyncIterableNonNullError', 1],
},
],
hasNext: true,
},
{
data: [
{
name: 'Han',
},
],
path: ['asyncIterableNonNullError'],
atIndices: [2],
hasNext: false,
},
]);
Expand Down
Loading

0 comments on commit 99a85d4

Please sign in to comment.