Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core[patch]: Move async generator consumption code into local storage context #5439

Merged
merged 4 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 25 additions & 15 deletions langchain-core/src/runnables/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,11 @@ export abstract class Runnable<
): Promise<IterableReadableStream<RunOutput>> {
// Buffer the first streamed chunk to allow for initial errors
// to surface immediately.
const wrappedGenerator = new AsyncGeneratorWithSetup(
this._streamIterator(input, ensureConfig(options))
);
const config = ensureConfig(options);
const wrappedGenerator = new AsyncGeneratorWithSetup({
generator: this._streamIterator(input, config),
config,
});
await wrappedGenerator.setup;
return IterableReadableStream.fromAsyncGenerator(wrappedGenerator);
}
Expand Down Expand Up @@ -1945,9 +1947,11 @@ export class RunnableMap<
async function* generator() {
yield input;
}
const wrappedGenerator = new AsyncGeneratorWithSetup(
this.transform(generator(), options)
);
const config = ensureConfig(options);
const wrappedGenerator = new AsyncGeneratorWithSetup({
generator: this.transform(generator(), config),
config,
});
await wrappedGenerator.setup;
return IterableReadableStream.fromAsyncGenerator(wrappedGenerator);
}
Expand Down Expand Up @@ -2151,9 +2155,11 @@ export class RunnableLambda<RunInput, RunOutput> extends Runnable<
async function* generator() {
yield input;
}
const wrappedGenerator = new AsyncGeneratorWithSetup(
this.transform(generator(), options)
);
const config = ensureConfig(options);
const wrappedGenerator = new AsyncGeneratorWithSetup({
generator: this.transform(generator(), config),
config,
});
await wrappedGenerator.setup;
return IterableReadableStream.fromAsyncGenerator(wrappedGenerator);
}
Expand Down Expand Up @@ -2458,9 +2464,11 @@ export class RunnableAssign<
async function* generator() {
yield input;
}
const wrappedGenerator = new AsyncGeneratorWithSetup(
this.transform(generator(), options)
);
const config = ensureConfig(options);
const wrappedGenerator = new AsyncGeneratorWithSetup({
generator: this.transform(generator(), config),
config,
});
await wrappedGenerator.setup;
return IterableReadableStream.fromAsyncGenerator(wrappedGenerator);
}
Expand Down Expand Up @@ -2549,9 +2557,11 @@ export class RunnablePick<
async function* generator() {
yield input;
}
const wrappedGenerator = new AsyncGeneratorWithSetup(
this.transform(generator(), options)
);
const config = ensureConfig(options);
const wrappedGenerator = new AsyncGeneratorWithSetup({
generator: this.transform(generator(), config),
config,
});
await wrappedGenerator.setup;
return IterableReadableStream.fromAsyncGenerator(wrappedGenerator);
}
Expand Down
27 changes: 27 additions & 0 deletions langchain-core/src/singletons/tests/async_local_storage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,30 @@ test("Config should be automatically populated after setting global async local
events.filter((event) => event.event === "on_llm_start").length
).toEqual(1);
});

test("Runnable streamEvents method with streaming nested in a RunnableLambda", async () => {
AsyncLocalStorageProviderSingleton.initializeGlobalInstance(
new AsyncLocalStorage()
);
const chat = new FakeListChatModel({
responses: ["Hello"],
});
const myFunc = async (input: string) => {
for await (const _ of await chat.stream(input)) {
}
};

const myNestedLambda = RunnableLambda.from(myFunc);

const events = [];
for await (const event of myNestedLambda.streamEvents("hello", {
version: "v1",
})) {
console.log(event);
events.push(event);
}
const chatModelStreamEvent = events.find((event) => {
return event.event === "on_llm_stream";
});
expect(chatModelStreamEvent).toBeDefined();
});
37 changes: 27 additions & 10 deletions langchain-core/src/utils/stream.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// Make this a type to override ReadableStream's async iterator type in case
// the popular web-streams-polyfill is imported - the supplied types
import { AsyncLocalStorageProviderSingleton } from "../singletons/index.js";

// in this case don't quite match.
export type IterableReadableStreamInterface<T> = ReadableStream<T> &
AsyncIterable<T>;
Expand Down Expand Up @@ -182,23 +184,33 @@ export class AsyncGeneratorWithSetup<

public setup: Promise<S>;

public config?: unknown;

private firstResult: Promise<IteratorResult<T>>;

private firstResultUsed = false;

constructor(generator: AsyncGenerator<T>, startSetup?: () => Promise<S>) {
this.generator = generator;
constructor(params: {
generator: AsyncGenerator<T>;
startSetup?: () => Promise<S>;
config?: unknown;
}) {
this.generator = params.generator;
this.config = params.config;
// setup is a promise that resolves only after the first iterator value
// is available. this is useful when setup of several piped generators
// needs to happen in logical order, ie. in the order in which input to
// to each generator is available.
this.setup = new Promise((resolve, reject) => {
this.firstResult = generator.next();
if (startSetup) {
this.firstResult.then(startSetup).then(resolve, reject);
} else {
this.firstResult.then((_result) => resolve(undefined as S), reject);
}
const storage = AsyncLocalStorageProviderSingleton.getInstance();
storage.run(params.config, async () => {
jacoblee93 marked this conversation as resolved.
Show resolved Hide resolved
this.firstResult = params.generator.next();
if (params.startSetup) {
this.firstResult.then(params.startSetup).then(resolve, reject);
} else {
this.firstResult.then((_result) => resolve(undefined as S), reject);
}
});
});
}

Expand All @@ -208,7 +220,12 @@ export class AsyncGeneratorWithSetup<
return this.firstResult;
}

return this.generator.next(...args);
const storage = AsyncLocalStorageProviderSingleton.getInstance();
return new Promise((resolve) => {
jacoblee93 marked this conversation as resolved.
Show resolved Hide resolved
storage.run(this.config, async () => {
resolve(this.generator.next(...args));
});
});
}

async return(
Expand Down Expand Up @@ -245,7 +262,7 @@ export async function pipeGeneratorWithSetup<
startSetup: () => Promise<S>,
...args: A
) {
const gen = new AsyncGeneratorWithSetup(generator, startSetup);
const gen = new AsyncGeneratorWithSetup({ generator, startSetup });
const setup = await gen.setup;
return { output: to(gen, setup, ...args), setup };
}
Loading