Skip to content

Commit

Permalink
refine filtering
Browse files Browse the repository at this point in the history
We can stop checking for errors that will "filter" child futures once we have reached the point in the path at which the current incrementalContext was branched.
  • Loading branch information
yaacovCR committed Mar 12, 2024
1 parent 2665ebd commit 984fbd3
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 18 deletions.
10 changes: 7 additions & 3 deletions src/execution/IncrementalPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,7 @@ export function isDeferredGroupedFieldSetRecord(

export interface IncrementalContext {
deferUsageSet: DeferUsageSet | undefined;
path: Path | undefined;
errors: Array<GraphQLError>;
errorPaths: Set<Path>;
futures: Array<Future>;
Expand Down Expand Up @@ -689,16 +690,17 @@ export class DeferredGroupedFieldSetRecord {

const incrementalContext: IncrementalContext = {
deferUsageSet,
path,
errors: [],
errorPaths: new Set(),
futures: [],
};

for (const deferredFragmentRecord of this.deferredFragmentRecords) {
for (const deferredFragmentRecord of deferredFragmentRecords) {
deferredFragmentRecord.deferredGroupedFieldSetRecords.push(this);
}

this.result = this.deferredFragmentRecords.some(
this.result = deferredFragmentRecords.some(
(deferredFragmentRecord) => deferredFragmentRecord.id !== undefined,
)
? executor(incrementalContext)
Expand Down Expand Up @@ -777,14 +779,16 @@ export class StreamItemsRecord {

constructor(opts: {
streamRecord: StreamRecord;
itemPath?: Path | undefined;
executor: (
incrementalContext: IncrementalContext,
) => PromiseOrValue<StreamItemsResult>;
}) {
const { streamRecord, executor } = opts;
const { streamRecord, itemPath, executor } = opts;
this.streamRecord = streamRecord;
const incrementalContext: IncrementalContext = {
deferUsageSet: undefined,
path: itemPath,
errors: [],
errorPaths: new Set(),
futures: [],
Expand Down
52 changes: 37 additions & 15 deletions src/execution/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -293,14 +293,15 @@ function buildDataResponse(
futures: ReadonlyArray<Future>,
cancellableStreams: Set<StreamRecord>,
): ExecutionResult | ExperimentalIncrementalExecutionResults {
const filteredFutures = filterFutures(errorPaths, futures);
const filteredFutures = filterFutures(undefined, errorPaths, futures);
if (filteredFutures.length > 0) {
return buildIncrementalResponse(data, errors, futures, cancellableStreams);
}
return errors.length > 0 ? { errors, data } : { data };
}

function filterFutures(
initialPath: Path | undefined,
errorPaths: ReadonlySet<Path | undefined>,
futures: ReadonlyArray<Future>,
): ReadonlyArray<Future> {
Expand All @@ -310,19 +311,33 @@ function filterFutures(

const filteredFutures: Array<Future> = [];
for (const future of futures) {
let currentPath = isDeferredGroupedFieldSetRecord(future)
let currentPath: Path | undefined = isDeferredGroupedFieldSetRecord(future)
? future.path
: future.streamRecord.path;
while (currentPath !== undefined) {

if (errorPaths.has(currentPath)) {
continue;
}

const paths: Array<Path | undefined> = [currentPath];
let filtered = false;
while (currentPath !== initialPath) {
// Because currentPath leads to initialPath or is undefined, and the
// loop will exit if initialPath is undefined, currentPath must be
// defined.
// TODO: Consider, however, adding an invariant.
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
currentPath = currentPath!.prev;
if (errorPaths.has(currentPath)) {
filtered = true;
break;
}
currentPath = currentPath.prev;
paths.push(currentPath);
}
if (errorPaths.has(currentPath)) {
continue;

if (!filtered) {
filteredFutures.push(future);
}
filteredFutures.push(future);
}

return filteredFutures;
Expand Down Expand Up @@ -1966,7 +1981,7 @@ function executeDeferredGroupedFieldSet(
deferredFragmentRecords,
path: pathToArray(path),
data: resolved,
futures: filterFutures(errorPaths, futures),
futures: filterFutures(path, errorPaths, futures),
errors,
}),
(error) => {
Expand All @@ -1985,7 +2000,7 @@ function executeDeferredGroupedFieldSet(
deferredFragmentRecords,
path: pathToArray(path),
data,
futures: filterFutures(errorPaths, futures),
futures: filterFutures(path, errorPaths, futures),
errors,
};
}
Expand Down Expand Up @@ -2015,6 +2030,7 @@ function firstSyncStreamItems(

const firstStreamItems = new StreamItemsRecord({
streamRecord,
itemPath: initialPath,
executor: (incrementalContext) =>
Promise.resolve().then(() => {
const firstResult = executor(
Expand All @@ -2033,6 +2049,7 @@ function firstSyncStreamItems(

const nextStreamItems = new StreamItemsRecord({
streamRecord,
itemPath: currentPath,
executor: (nextIncrementalContext) =>
executor(currentPath, item, nextIncrementalContext),
});
Expand Down Expand Up @@ -2065,8 +2082,10 @@ function firstAsyncStreamItems(
incrementalContext: IncrementalContext,
) => PromiseOrValue<StreamItemsResult>,
): StreamItemsRecord {
const initialPath = addPath(path, initialIndex, undefined);
const firstStreamItems: StreamItemsRecord = new StreamItemsRecord({
streamRecord,
itemPath: initialPath,
executor: (incrementalContext) =>
Promise.resolve().then(() =>
getNextAsyncStreamItemsResult(
Expand Down Expand Up @@ -2120,7 +2139,8 @@ async function getNextAsyncStreamItemsResult(
const nextStreamItems: StreamItemsRecord = nextAsyncStreamItems(
streamRecord,
path,
index + 1,
itemPath,
index,
nodes,
asyncIterator,
executor,
Expand All @@ -2133,6 +2153,7 @@ async function getNextAsyncStreamItemsResult(
function nextAsyncStreamItems(
streamRecord: StreamRecord,
path: Path,
initialPath: Path,
initialIndex: number,
nodes: ReadonlyArray<FieldNode>,
asyncIterator: AsyncIterator<unknown>,
Expand All @@ -2144,13 +2165,14 @@ function nextAsyncStreamItems(
): StreamItemsRecord {
const nextStreamItems: StreamItemsRecord = new StreamItemsRecord({
streamRecord,
itemPath: initialPath,
executor: (incrementalContext) =>
Promise.resolve().then(() =>
getNextAsyncStreamItemsResult(
streamRecord,
nextStreamItems,
path,
initialIndex,
initialIndex + 1,
incrementalContext,
nodes,
asyncIterator,
Expand All @@ -2171,7 +2193,7 @@ function completeStreamItems(
info: GraphQLResolveInfo,
itemType: GraphQLOutputType,
): PromiseOrValue<StreamItemsResult> {
const { errors, errorPaths, futures } = incrementalContext;
const { path, errors, errorPaths, futures } = incrementalContext;
if (isPromise(item)) {
return completePromisedValue(
exeContext,
Expand All @@ -2186,7 +2208,7 @@ function completeStreamItems(
(resolvedItem) => ({
streamRecord,
items: [resolvedItem],
futures: filterFutures(errorPaths, futures),
futures: filterFutures(path, errorPaths, futures),
errors,
}),
(error) => {
Expand Down Expand Up @@ -2250,7 +2272,7 @@ function completeStreamItems(
(resolvedItem) => ({
streamRecord,
items: [resolvedItem],
futures: filterFutures(errorPaths, futures),
futures: filterFutures(path, errorPaths, futures),
errors,
}),
(error) => {
Expand All @@ -2267,7 +2289,7 @@ function completeStreamItems(
return {
streamRecord,
items: [completedItem],
futures: filterFutures(errorPaths, futures),
futures: filterFutures(path, errorPaths, futures),
errors,
};
}

0 comments on commit 984fbd3

Please sign in to comment.