Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core[patch]: Adds streaming for RunnableWithFallbacks, update input #6487

Merged
merged 1 commit into from
Aug 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 80 additions & 4 deletions langchain-core/src/runnables/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,18 @@ export abstract class Runnable<
* @param fields.fallbacks Other runnables to call if the runnable errors.
* @returns A new RunnableWithFallbacks.
*/
withFallbacks(fields: {
fallbacks: Runnable<RunInput, RunOutput>[];
}): RunnableWithFallbacks<RunInput, RunOutput> {
withFallbacks(
fields:
| {
fallbacks: Runnable<RunInput, RunOutput>[];
}
| Runnable<RunInput, RunOutput>[]
): RunnableWithFallbacks<RunInput, RunOutput> {
const fallbacks = Array.isArray(fields) ? fields : fields.fallbacks;
// eslint-disable-next-line @typescript-eslint/no-use-before-define
return new RunnableWithFallbacks<RunInput, RunOutput>({
runnable: this,
fallbacks: fields.fallbacks,
fallbacks,
});
}

Expand Down Expand Up @@ -2493,6 +2498,22 @@ export class RunnableParallel<RunInput> extends RunnableMap<RunInput> {}

/**
* A Runnable that can fallback to other Runnables if it fails.
* External APIs (e.g., APIs for a language model) may at times experience
* degraded performance or even downtime.
*
* In these cases, it can be useful to have a fallback Runnable that can be
* used in place of the original Runnable (e.g., fallback to another LLM provider).
*
* Fallbacks can be defined at the level of a single Runnable, or at the level
* of a chain of Runnables. Fallbacks are tried in order until one succeeds or
* all fail.
*
* While you can instantiate a `RunnableWithFallbacks` directly, it is usually
* more convenient to use the `withFallbacks` method on an existing Runnable.
*
* When streaming, fallbacks will only be called on failures during the initial
* stream creation. Errors that occur after a stream starts will not fallback
* to the next Runnable.
*/
export class RunnableWithFallbacks<RunInput, RunOutput> extends Runnable<
RunInput,
Expand Down Expand Up @@ -2565,6 +2586,61 @@ export class RunnableWithFallbacks<RunInput, RunOutput> extends Runnable<
throw firstError;
}

async *_streamIterator(
input: RunInput,
options?: Partial<RunnableConfig> | undefined
): AsyncGenerator<RunOutput> {
const config = ensureConfig(options);
const callbackManager_ = await getCallbackManagerForConfig(options);
const { runId, ...otherConfigFields } = config;
const runManager = await callbackManager_?.handleChainStart(
this.toJSON(),
_coerceToDict(input, "input"),
runId,
undefined,
undefined,
undefined,
otherConfigFields?.runName
);
let firstError;
let stream;
for (const runnable of this.runnables()) {
config?.signal?.throwIfAborted();
const childConfig = patchConfig(otherConfigFields, {
callbacks: runManager?.getChild(),
});
try {
stream = await runnable.stream(input, childConfig);
break;
} catch (e) {
if (firstError === undefined) {
firstError = e;
}
}
}
if (stream === undefined) {
const error =
firstError ?? new Error("No error stored at end of fallback.");
await runManager?.handleChainError(error);
throw error;
}
let output;
try {
for await (const chunk of stream) {
yield chunk;
try {
output = output === undefined ? output : concat(output, chunk);
} catch (e) {
output = undefined;
}
}
} catch (e) {
await runManager?.handleChainError(e);
throw e;
}
await runManager?.handleChainEnd(_coerceToDict(output, "output"));
}

async batch(
inputs: RunInput[],
options?: Partial<RunnableConfig> | Partial<RunnableConfig>[],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* eslint-disable no-promise-executor-return */
/* eslint-disable @typescript-eslint/no-explicit-any */
import { test, expect } from "@jest/globals";
import { FakeLLM } from "../../utils/testing/index.js";
import { FakeLLM, FakeStreamingLLM } from "../../utils/testing/index.js";

test("RunnableWithFallbacks", async () => {
const llm = new FakeLLM({
Expand Down Expand Up @@ -36,3 +36,22 @@ test("RunnableWithFallbacks batch", async () => {
]);
expect(result2).toEqual(["What up 1", "What up 2", "What up 3"]);
});

test("RunnableWithFallbacks stream", async () => {
const llm = new FakeStreamingLLM({
thrownErrorString: "Bad error!",
});
await expect(async () => {
await llm.stream("What up");
}).rejects.toThrow();
const llmWithFallbacks = llm.withFallbacks({
fallbacks: [new FakeStreamingLLM({})],
});
const stream = await llmWithFallbacks.stream("What up");
const chunks = [];
for await (const chunk of stream) {
chunks.push(chunk);
}
expect(chunks.length).toBeGreaterThan(1);
expect(chunks.join("")).toEqual("What up");
});
Loading