Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core[patch]: Require explicit override for callbacks within a nested config #5994

Merged
merged 8 commits into from
Jul 9, 2024
10 changes: 8 additions & 2 deletions langchain-core/src/runnables/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,17 @@ export { type RunnableInterface, RunnableBatchOptions };
export type RunnableFunc<RunInput, RunOutput> = (
input: RunInput,
options?:
| ({ config?: RunnableConfig } & RunnableConfig)
| ({
/** @deprecated Use top-level config fields instead. */
config?: RunnableConfig;
} & RunnableConfig)
// eslint-disable-next-line @typescript-eslint/no-explicit-any
| Record<string, any>
// eslint-disable-next-line @typescript-eslint/no-explicit-any
| (Record<string, any> & { config: RunnableConfig } & RunnableConfig)
| (Record<string, any> & {
/** @deprecated Use top-level config fields instead. */
config: RunnableConfig;
} & RunnableConfig)
) => RunOutput | Promise<RunOutput>;

export type RunnableMapLike<RunInput, RunOutput> = {
Expand Down
44 changes: 35 additions & 9 deletions langchain-core/src/runnables/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,28 +126,54 @@ const PRIMITIVES = new Set(["string", "number", "boolean"]);
export function ensureConfig<CallOptions extends RunnableConfig>(
config?: CallOptions
): CallOptions {
const loadedConfig =
config ?? AsyncLocalStorageProviderSingleton.getInstance().getStore();
const implicitConfig =
AsyncLocalStorageProviderSingleton.getInstance().getStore();
let empty: RunnableConfig = {
tags: [],
metadata: {},
callbacks: undefined,
recursionLimit: 25,
runId: undefined,
};
if (loadedConfig) {
empty = { ...empty, ...loadedConfig };
if (implicitConfig) {
// Don't allow runId to be loaded implicitly, as this can cause
// child runs to improperly inherit their parents' run ids.
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const { runId, ...rest } = implicitConfig;
empty = Object.entries(rest).reduce(
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(currentConfig: Record<string, any>, [key, value]) => {
if (value !== undefined) {
// eslint-disable-next-line no-param-reassign
currentConfig[key] = value;
}
return currentConfig;
},
empty
);
}
if (config) {
empty = Object.entries(config).reduce(
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(currentConfig: Record<string, any>, [key, value]) => {
if (value !== undefined) {
// eslint-disable-next-line no-param-reassign
currentConfig[key] = value;
}
return currentConfig;
},
empty
);
}
if (loadedConfig?.configurable) {
for (const key of Object.keys(loadedConfig.configurable)) {
if (empty?.configurable) {
for (const key of Object.keys(empty.configurable)) {
if (
PRIMITIVES.has(typeof loadedConfig.configurable[key]) &&
PRIMITIVES.has(typeof empty.configurable[key]) &&
!empty.metadata?.[key]
) {
if (!empty.metadata) {
empty.metadata = {};
}
empty.metadata[key] = loadedConfig.configurable[key];
empty.metadata[key] = empty.configurable[key];
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions langchain-core/src/runnables/history.ts
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,9 @@ export class RunnableWithMessageHistory<
async _enterHistory(
// eslint-disable-next-line @typescript-eslint/no-explicit-any
input: any,
kwargs?: { config?: RunnableConfig }
kwargs?: RunnableConfig
): Promise<BaseMessage[]> {
const history = kwargs?.config?.configurable?.messageHistory;
const history = kwargs?.configurable?.messageHistory;
const messages = await history.getMessages();
if (this.historyMessagesKey === undefined) {
return messages.concat(this._getInputMessages(input));
Expand Down
12 changes: 6 additions & 6 deletions langchain-core/src/runnables/tests/runnable.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -436,20 +436,20 @@ test("Create a runnable sequence with a static method with invalid output and ca
test("RunnableSequence can pass config to every step in batched request", async () => {
let numSeen = 0;

const addOne = (x: number, options?: { config?: RunnableConfig }) => {
if (options?.config?.configurable?.isPresent === true) {
const addOne = (x: number, options?: RunnableConfig) => {
if (options?.configurable?.isPresent === true) {
numSeen += 1;
}
return x + 1;
};
const addTwo = (x: number, options?: { config?: RunnableConfig }) => {
if (options?.config?.configurable?.isPresent === true) {
const addTwo = (x: number, options?: RunnableConfig) => {
if (options?.configurable?.isPresent === true) {
numSeen += 1;
}
return x + 2;
};
const addThree = (x: number, options?: { config?: RunnableConfig }) => {
if (options?.config?.configurable?.isPresent === true) {
const addThree = (x: number, options?: RunnableConfig) => {
if (options?.configurable?.isPresent === true) {
numSeen += 1;
}
return x + 3;
Expand Down
36 changes: 33 additions & 3 deletions langchain-core/src/singletons/tests/async_local_storage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ import { AsyncLocalStorageProviderSingleton } from "../index.js";
import { RunnableLambda } from "../../runnables/base.js";
import { FakeListChatModel } from "../../utils/testing/index.js";
import { getCallbackManagerForConfig } from "../../runnables/config.js";
import { BaseCallbackHandler } from "../../callbacks/base.js";

class FakeCallbackHandler extends BaseCallbackHandler {
name = `fake-${v4()}`;
}

test("Config should be automatically populated after setting global async local storage", async () => {
const inner = RunnableLambda.from((_, config) => config);
Expand Down Expand Up @@ -47,7 +52,6 @@ test("Config should be automatically populated after setting global async local
);
const chunks = [];
for await (const chunk of stream) {
console.log(chunk);
chunks.push(chunk);
}
expect(chunks.length).toEqual(1);
Expand Down Expand Up @@ -82,7 +86,6 @@ test("Config should be automatically populated after setting global async local
);
const chunks2 = [];
for await (const chunk of stream2) {
console.log(chunk);
chunks2.push(chunk);
}
expect(chunks2.length).toEqual(1);
Expand Down Expand Up @@ -126,7 +129,6 @@ test("Config should be automatically populated after setting global async local
);
const events = [];
for await (const event of eventStream) {
console.log(event);
events.push(event);
}
expect(
Expand All @@ -143,11 +145,38 @@ test("Runnable streamEvents method with streaming nested in a RunnableLambda", a
responses: ["Hello"],
});
const outerRunId = v4();
const innerRunId = v4();
const innerRunId2 = v4();
const dummyHandler = new FakeCallbackHandler();
const myFunc = async (input: string) => {
const outerCallbackManager = await getCallbackManagerForConfig(
asyncLocalStorage.getStore()
);
expect(outerCallbackManager?.getParentRunId()).toEqual(outerRunId);

const nestedLambdaWithOverriddenCallbacks = RunnableLambda.from(
async (_: string, config) => {
expect(config?.callbacks?.handlers).toEqual([]);
}
);
await nestedLambdaWithOverriddenCallbacks.invoke(input, {
runId: innerRunId,
callbacks: [],
});

const nestedLambdaWithoutOverriddenCallbacks = RunnableLambda.from(
async (_: string, config) => {
const innerCallbackManager = await getCallbackManagerForConfig(
asyncLocalStorage.getStore()
);
expect(innerCallbackManager?.getParentRunId()).toEqual(innerRunId2);
expect(config?.callbacks?.handlers).toContain(dummyHandler);
}
);
await nestedLambdaWithoutOverriddenCallbacks.invoke(input, {
runId: innerRunId2,
});

for await (const _ of await chat.stream(input)) {
// no-op
}
Expand All @@ -159,6 +188,7 @@ test("Runnable streamEvents method with streaming nested in a RunnableLambda", a
for await (const event of myNestedLambda.streamEvents("hello", {
version: "v1",
runId: outerRunId,
callbacks: [dummyHandler],
})) {
events.push(event);
}
Expand Down
Loading