Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core[minor]: Use run tree as stored value in async local storage #5992

Merged
merged 38 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
649232b
Use run tree as stored value in async local storage
jacoblee93 Jul 7, 2024
7e2c703
Fix parent run tracking when streaming runnable lambdas
jacoblee93 Jul 7, 2024
9c33b0c
lint
jacoblee93 Jul 7, 2024
9eb7870
Merge
jacoblee93 Jul 7, 2024
69aa1b5
Merge
jacoblee93 Jul 7, 2024
73a0698
Clean up
jacoblee93 Jul 7, 2024
abe4d2a
Naming
jacoblee93 Jul 7, 2024
d8dc43b
Clean up
jacoblee93 Jul 7, 2024
3b074b7
Reduce changes
jacoblee93 Jul 7, 2024
9d6c193
Reduce diff
jacoblee93 Jul 7, 2024
07c24f6
Readd exposed method
jacoblee93 Jul 7, 2024
11ee418
Merge
jacoblee93 Jul 7, 2024
528cbe1
Merge config
dqbd Jul 15, 2024
038ad0e
Check if we should include the tracer depending on the context
dqbd Jul 16, 2024
80ca236
Reconstruct the run tree from run map
dqbd Jul 16, 2024
4f50191
Merge branch 'main' into jacob/interop
dqbd Jul 16, 2024
27d93a0
Update note
dqbd Jul 16, 2024
83c81e8
Use a symbol
dqbd Jul 16, 2024
c4f73fa
Fix ALS test
dqbd Jul 16, 2024
0210975
Use the ALS singleton from LC
dqbd Jul 16, 2024
1746136
Remove circular dep
dqbd Jul 16, 2024
d348779
Expose method for updating tracer from run tree
dqbd Jul 16, 2024
03efaa0
Send a dummy run tree in case there's no tracer present
dqbd Jul 16, 2024
347dcb3
Prevent inheriting runName
dqbd Jul 16, 2024
97a0a19
Merge branch 'main' of https://github.com/hwchase17/langchainjs into …
jacoblee93 Jul 18, 2024
b5fa41f
Make sync operations in base tracer execute first
jacoblee93 Jul 18, 2024
4cc9cfe
Adds unit test
jacoblee93 Jul 18, 2024
14d70b9
Streaming tests
jacoblee93 Jul 18, 2024
bdb3531
Revert base tracer
jacoblee93 Jul 19, 2024
95fc457
Adds sync methods for adding runs to run map for tracers
jacoblee93 Jul 19, 2024
86d94ae
Docstrings, lint
jacoblee93 Jul 19, 2024
5ca500b
Merge
jacoblee93 Jul 19, 2024
bd75f55
Fix test
jacoblee93 Jul 19, 2024
2a7ae77
Relax test
jacoblee93 Jul 19, 2024
3a6b48e
Merge branch 'main' of https://github.com/hwchase17/langchainjs into …
jacoblee93 Jul 19, 2024
609851f
Fix tracing for streaming calls with no config
jacoblee93 Jul 20, 2024
d643a62
Adds a proper tracing test for tracers initalized from env vars
jacoblee93 Jul 22, 2024
22254ea
Bump LangSmith version
jacoblee93 Jul 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions langchain-core/src/callbacks/manager.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { v4 as uuidv4 } from "uuid";
import { AsyncLocalStorageProviderSingleton } from "langsmith/singletons/traceable";
import { AgentAction, AgentFinish } from "../agents.js";
import type { ChainValues } from "../utils/types/index.js";
import { LLMResult } from "../outputs.js";
Expand All @@ -22,7 +23,7 @@ import type { DocumentInterface } from "../documents/document.js";
import { isTracingEnabled } from "../utils/callbacks.js";

if (
/* #__PURE__ */ getEnvironmentVariable("LANGCHAIN_TRACING_V2") === "true" &&
/* #__PURE__ */ isTracingEnabled() &&
/* #__PURE__ */ getEnvironmentVariable("LANGCHAIN_CALLBACKS_BACKGROUND") !==
"true"
) {
Expand Down Expand Up @@ -1009,7 +1010,9 @@ export class CallbackManager
const verboseEnabled =
getEnvironmentVariable("LANGCHAIN_VERBOSE") === "true" ||
options?.verbose;
const tracingV2Enabled = isTracingEnabled();
const tracingV2Enabled =
AsyncLocalStorageProviderSingleton.getInstance().getStore()
?.tracingEnabled || isTracingEnabled();

const tracingEnabled =
tracingV2Enabled ||
Expand Down
4 changes: 2 additions & 2 deletions langchain-core/src/runnables/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2224,7 +2224,7 @@ export class RunnableLambda<RunInput, RunOutput> extends Runnable<
callbacks: runManager?.getChild(),
recursionLimit: (config?.recursionLimit ?? DEFAULT_RECURSION_LIMIT) - 1,
});
void AsyncLocalStorageProviderSingleton.getInstance().run(
void AsyncLocalStorageProviderSingleton.runWithConfig(
childConfig,
async () => {
try {
Expand Down Expand Up @@ -2321,7 +2321,7 @@ export class RunnableLambda<RunInput, RunOutput> extends Runnable<
});
const output = await new Promise<RunOutput | Runnable>(
(resolve, reject) => {
void AsyncLocalStorageProviderSingleton.getInstance().run(
void AsyncLocalStorageProviderSingleton.runWithConfig(
childConfig,
async () => {
try {
Expand Down
37 changes: 11 additions & 26 deletions langchain-core/src/runnables/config.ts
Original file line number Diff line number Diff line change
@@ -1,31 +1,13 @@
import {
type BaseCallbackConfig,
CallbackManager,
ensureHandler,
} from "../callbacks/manager.js";
import { CallbackManager, ensureHandler } from "../callbacks/manager.js";
import { AsyncLocalStorageProviderSingleton } from "../singletons/index.js";
import { RunnableConfig } from "./types.js";
jacoblee93 marked this conversation as resolved.
Show resolved Hide resolved

export const DEFAULT_RECURSION_LIMIT = 25;

export interface RunnableConfig extends BaseCallbackConfig {
/**
* Runtime values for attributes previously made configurable on this Runnable,
* or sub-Runnables.
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
configurable?: Record<string, any>;

/**
* Maximum number of times a call can recurse. If not provided, defaults to 25.
*/
recursionLimit?: number;

/** Maximum number of parallel calls to make. */
maxConcurrency?: number;
}
export { type RunnableConfig };

export async function getCallbackManagerForConfig(config?: RunnableConfig) {
return CallbackManager.configure(
return CallbackManager._configureSync(
config?.callbacks,
undefined,
config?.tags,
Expand Down Expand Up @@ -119,15 +101,18 @@ const PRIMITIVES = new Set(["string", "number", "boolean"]);

/**
* Ensure that a passed config is an object with all required keys present.
*
* Note: To make sure async local storage loading works correctly, this
* should not be called with a default or prepopulated config argument.
*/
export function ensureConfig<CallOptions extends RunnableConfig>(
config?: CallOptions
): CallOptions {
const inheritedConfig =
AsyncLocalStorageProviderSingleton.getRunnableConfig();

const loadedConfig =
config ?? AsyncLocalStorageProviderSingleton.getInstance().getStore();
inheritedConfig && config
? { ...inheritedConfig, ...config }
: config ?? inheritedConfig;

let empty: RunnableConfig = {
tags: [],
metadata: {},
Expand Down
16 changes: 9 additions & 7 deletions langchain-core/src/runnables/iter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ export function* consumeIteratorInContext<T>(
context: Partial<RunnableConfig> | undefined,
iter: IterableIterator<T>
): IterableIterator<T> {
const storage = AsyncLocalStorageProviderSingleton.getInstance();
while (true) {
const { value, done } = storage.run(context, iter.next.bind(iter));
const { value, done } = AsyncLocalStorageProviderSingleton.runWithConfig(
context,
iter.next.bind(iter)
);
if (done) {
break;
} else {
Expand All @@ -49,13 +51,13 @@ export async function* consumeAsyncIterableInContext<T>(
context: Partial<RunnableConfig> | undefined,
iter: AsyncIterable<T>
): AsyncIterableIterator<T> {
const storage = AsyncLocalStorageProviderSingleton.getInstance();
const iterator = iter[Symbol.asyncIterator]();
while (true) {
const { value, done } = await storage.run(
context,
iterator.next.bind(iter)
);
const { value, done } =
await AsyncLocalStorageProviderSingleton.runWithConfig(
context,
iterator.next.bind(iter)
);
if (done) {
break;
} else {
Expand Down
19 changes: 18 additions & 1 deletion langchain-core/src/runnables/types.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { z } from "zod";
import type { RunnableConfig } from "./config.js";
import type { IterableReadableStreamInterface } from "../utils/stream.js";
import type { SerializableInterface } from "../load/serializable.js";
import type { BaseCallbackConfig } from "../callbacks/manager.js";

export type RunnableBatchOptions = {
/** @deprecated Pass in via the standard runnable config object instead */
Expand Down Expand Up @@ -73,3 +73,20 @@ export interface Node {
id: string;
data: RunnableIOSchema | RunnableInterface;
}

export interface RunnableConfig extends BaseCallbackConfig {
/**
* Runtime values for attributes previously made configurable on this Runnable,
* or sub-Runnables.
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
configurable?: Record<string, any>;

/**
* Maximum number of times a call can recurse. If not provided, defaults to 25.
*/
recursionLimit?: number;

/** Maximum number of parallel calls to make. */
maxConcurrency?: number;
}
46 changes: 43 additions & 3 deletions langchain-core/src/singletons/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { CallbackManager } from "../callbacks/manager.js";
import { LangChainTracer } from "../tracers/tracer_langchain.js";

export interface AsyncLocalStorageInterface {
getStore: () => any | undefined;
Expand All @@ -21,14 +23,52 @@ const mockAsyncLocalStorage = new MockAsyncLocalStorage();
class AsyncLocalStorageProvider {
getInstance(): AsyncLocalStorageInterface {
return (
(globalThis as any).__lc_tracing_async_local_storage ??
(globalThis as any).__lc_tracing_async_local_storage_v2 ??
dqbd marked this conversation as resolved.
Show resolved Hide resolved
mockAsyncLocalStorage
);
}

getRunnableConfig() {
const storage = this.getInstance();
// this has the runnable config
// which means that I should also have an instance of a LangChainTracer
// with the run map prepopulated
return storage.getStore()?.extra?.[Symbol.for("lc:child_config")];
}

runWithConfig<T>(config: any, callback: () => T): T {
const callbackManager = CallbackManager._configureSync(
config?.callbacks,
undefined,
config?.tags,
undefined,
config?.metadata
);
const storage = this.getInstance();
const parentRunId = callbackManager?.getParentRunId();

const langChainTracer = callbackManager?.handlers?.find(
(handler) => handler?.name === "langchain_tracer"
) as LangChainTracer | undefined;

const runTree =
langChainTracer && parentRunId
? langChainTracer.convertToRunTree(parentRunId)
: undefined;

if (runTree) {
runTree.extra = {
...runTree.extra,
[Symbol.for("lc:child_config")]: config,
};
}

return storage.run(runTree, callback);
}

initializeGlobalInstance(instance: AsyncLocalStorageInterface) {
if ((globalThis as any).__lc_tracing_async_local_storage === undefined) {
(globalThis as any).__lc_tracing_async_local_storage = instance;
if ((globalThis as any).__lc_tracing_async_local_storage_v2 === undefined) {
(globalThis as any).__lc_tracing_async_local_storage_v2 = instance;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,13 @@ test("Runnable streamEvents method with streaming nested in a RunnableLambda", a
AsyncLocalStorageProviderSingleton.initializeGlobalInstance(
new AsyncLocalStorage()
);
const asyncLocalStorage = AsyncLocalStorageProviderSingleton.getInstance();
const chat = new FakeListChatModel({
responses: ["Hello"],
});
const outerRunId = v4();
const myFunc = async (input: string) => {
const outerCallbackManager = await getCallbackManagerForConfig(
asyncLocalStorage.getStore()
AsyncLocalStorageProviderSingleton.getRunnableConfig()
);
expect(outerCallbackManager?.getParentRunId()).toEqual(outerRunId);
for await (const _ of await chat.stream(input)) {
Expand Down
45 changes: 45 additions & 0 deletions langchain-core/src/tracers/tracer_langchain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,4 +147,49 @@ export class LangChainTracer
return undefined;
}
}

convertToRunTree(id: string): RunTree | undefined {
// create a run tree from a run map
const runTreeMap: Record<string, RunTree> = {};
const runTreeList: [id: string, dotted_order: string | undefined][] = [];
for (const [id, run] of this.runMap) {
// TODO: this loses object reference equality
dqbd marked this conversation as resolved.
Show resolved Hide resolved
// wrap it in a proxy to copy properties back to the original run map
const runTree = new RunTree({
...run,
child_runs: [],
parent_run: undefined,

// inherited properties
client: this.client,
project_name: this.projectName,
reference_example_id: this.exampleId,
tracingEnabled: true,
});

runTreeMap[id] = runTree;
runTreeList.push([id, run.dotted_order]);
}

runTreeList.sort((a, b) => {
if (!a[1] || !b[1]) return 0;
return a[1].localeCompare(b[1]);
});

for (const [id] of runTreeList) {
const run = this.runMap.get(id);
const runTree = runTreeMap[id];
if (!run || !runTree) continue;

if (run.parent_run_id) {
const parentRunTree = runTreeMap[run.parent_run_id];
if (parentRunTree) {
parentRunTree.child_runs.push(runTree);
runTree.parent_run = parentRunTree;
}
}
}

return runTreeMap[id];
}
}
39 changes: 23 additions & 16 deletions langchain-core/src/utils/stream.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
// Make this a type to override ReadableStream's async iterator type in case
// the popular web-streams-polyfill is imported - the supplied types
import { AsyncLocalStorageProviderSingleton } from "../singletons/index.js";

// in this case don't quite match.
// Make this a type to override ReadableStream's async iterator type in case
// the popular web-streams-polyfill is imported - the supplied types
// in that case don't quite match.
export type IterableReadableStreamInterface<T> = ReadableStream<T> &
AsyncIterable<T>;

Expand Down Expand Up @@ -202,15 +202,17 @@ export class AsyncGeneratorWithSetup<
// needs to happen in logical order, ie. in the order in which input to
// to each generator is available.
this.setup = new Promise((resolve, reject) => {
const storage = AsyncLocalStorageProviderSingleton.getInstance();
void storage.run(params.config, async () => {
this.firstResult = params.generator.next();
if (params.startSetup) {
this.firstResult.then(params.startSetup).then(resolve, reject);
} else {
this.firstResult.then((_result) => resolve(undefined as S), reject);
void AsyncLocalStorageProviderSingleton.runWithConfig(
params.config,
async () => {
this.firstResult = params.generator.next();
if (params.startSetup) {
this.firstResult.then(params.startSetup).then(resolve, reject);
} else {
this.firstResult.then((_result) => resolve(undefined as S), reject);
}
}
});
);
});
}

Expand All @@ -220,10 +222,12 @@ export class AsyncGeneratorWithSetup<
return this.firstResult;
}

const storage = AsyncLocalStorageProviderSingleton.getInstance();
return storage.run(this.config, async () => {
return this.generator.next(...args);
});
return AsyncLocalStorageProviderSingleton.runWithConfig(
this.config,
async () => {
return this.generator.next(...args);
}
);
}

async return(
Expand Down Expand Up @@ -260,7 +264,10 @@ export async function pipeGeneratorWithSetup<
startSetup: () => Promise<S>,
...args: A
) {
const gen = new AsyncGeneratorWithSetup({ generator, startSetup });
const gen = new AsyncGeneratorWithSetup({
generator,
startSetup,
});
const setup = await gen.setup;
return { output: to(gen, setup, ...args), setup };
}