diff --git a/langchain-core/.gitignore b/langchain-core/.gitignore index a2a996516871..927d25edbf1c 100644 --- a/langchain-core/.gitignore +++ b/langchain-core/.gitignore @@ -166,10 +166,6 @@ utils/async_caller.cjs utils/async_caller.js utils/async_caller.d.ts utils/async_caller.d.cts -utils/beta_warning.cjs -utils/beta_warning.js -utils/beta_warning.d.ts -utils/beta_warning.d.cts utils/chunk_array.cjs utils/chunk_array.js utils/chunk_array.d.ts diff --git a/langchain-core/langchain.config.js b/langchain-core/langchain.config.js index b62973d89a51..0b4f41da98ba 100644 --- a/langchain-core/langchain.config.js +++ b/langchain-core/langchain.config.js @@ -54,7 +54,6 @@ export const config = { "tracers/tracer_langchain": "tracers/tracer_langchain", "tracers/tracer_langchain_v1": "tracers/tracer_langchain_v1", "utils/async_caller": "utils/async_caller", - "utils/beta_warning": "utils/beta_warning", "utils/chunk_array": "utils/chunk_array", "utils/env": "utils/env", "utils/event_source_parse": "utils/event_source_parse", diff --git a/langchain-core/package.json b/langchain-core/package.json index 0be8e54f28e1..258045f35fee 100644 --- a/langchain-core/package.json +++ b/langchain-core/package.json @@ -473,15 +473,6 @@ "import": "./utils/async_caller.js", "require": "./utils/async_caller.cjs" }, - "./utils/beta_warning": { - "types": { - "import": "./utils/beta_warning.d.ts", - "require": "./utils/beta_warning.d.cts", - "default": "./utils/beta_warning.d.ts" - }, - "import": "./utils/beta_warning.js", - "require": "./utils/beta_warning.cjs" - }, "./utils/chunk_array": { "types": { "import": "./utils/chunk_array.d.ts", @@ -771,10 +762,6 @@ "utils/async_caller.js", "utils/async_caller.d.ts", "utils/async_caller.d.cts", - "utils/beta_warning.cjs", - "utils/beta_warning.js", - "utils/beta_warning.d.ts", - "utils/beta_warning.d.cts", "utils/chunk_array.cjs", "utils/chunk_array.js", "utils/chunk_array.d.ts", diff --git a/langchain-core/src/callbacks/manager.ts b/langchain-core/src/callbacks/manager.ts index 1ec9f6589f83..834ee61d6a38 100644 --- a/langchain-core/src/callbacks/manager.ts +++ b/langchain-core/src/callbacks/manager.ts @@ -9,7 +9,6 @@ import { NewTokenIndices, } from "./base.js"; import { ConsoleCallbackHandler } from "../tracers/console.js"; -import { getTracingV2CallbackHandler } from "../tracers/initialize.js"; import { type BaseMessage } from "../messages/base.js"; import { getBufferString } from "../messages/utils.js"; import { getEnvironmentVariable } from "../utils/env.js"; @@ -20,6 +19,7 @@ import { import { consumeCallback } from "./promises.js"; import { Serialized } from "../load/serializable.js"; import type { DocumentInterface } from "../documents/document.js"; +import { isTracingEnabled } from "../utils/callbacks.js"; if ( /* #__PURE__ */ getEnvironmentVariable("LANGCHAIN_TRACING_V2") === "true" && @@ -111,7 +111,7 @@ export abstract class BaseCallbackManager { /** * Base class for run manager in LangChain. */ -class BaseRunManager { +export class BaseRunManager { constructor( public readonly runId: string, public readonly handlers: BaseCallbackHandler[], @@ -123,6 +123,10 @@ class BaseRunManager { protected readonly _parentRunId?: string ) {} + get parentRunId() { + return this._parentRunId; + } + async handleText(text: string): Promise { await Promise.all( this.handlers.map((handler) => @@ -962,6 +966,27 @@ export class CallbackManager localMetadata?: Record, options?: CallbackManagerOptions ): Promise { + return this._configureSync( + inheritableHandlers, + localHandlers, + inheritableTags, + localTags, + inheritableMetadata, + localMetadata, + options + ); + } + + // TODO: Deprecate async method in favor of this one. + static _configureSync( + inheritableHandlers?: Callbacks, + localHandlers?: Callbacks, + inheritableTags?: string[], + localTags?: string[], + inheritableMetadata?: Record, + localMetadata?: Record, + options?: CallbackManagerOptions + ) { let callbackManager: CallbackManager | undefined; if (inheritableHandlers || localHandlers) { if (Array.isArray(inheritableHandlers) || !inheritableHandlers) { @@ -984,9 +1009,7 @@ export class CallbackManager const verboseEnabled = getEnvironmentVariable("LANGCHAIN_VERBOSE") === "true" || options?.verbose; - const tracingV2Enabled = - getEnvironmentVariable("LANGCHAIN_TRACING_V2") === "true" || - getEnvironmentVariable("LANGSMITH_TRACING") === "true"; + const tracingV2Enabled = isTracingEnabled(); const tracingEnabled = tracingV2Enabled || @@ -1011,7 +1034,7 @@ export class CallbackManager ) ) { if (tracingV2Enabled) { - const tracerV2 = await getTracingV2CallbackHandler(); + const tracerV2 = new LangChainTracer(); callbackManager.addHandler(tracerV2, true); // handoff between langchain and langsmith/traceable diff --git a/langchain-core/src/runnables/base.ts b/langchain-core/src/runnables/base.ts index a2eea53ce474..8602faef5118 100644 --- a/langchain-core/src/runnables/base.ts +++ b/langchain-core/src/runnables/base.ts @@ -7,10 +7,7 @@ import { isTraceableFunction, } from "langsmith/singletons/traceable"; import type { RunnableInterface, RunnableBatchOptions } from "./types.js"; -import { - CallbackManager, - CallbackManagerForChainRun, -} from "../callbacks/manager.js"; +import { CallbackManagerForChainRun } from "../callbacks/manager.js"; import { LogStreamCallbackHandler, LogStreamCallbackHandlerInput, @@ -2318,15 +2315,19 @@ export class RunnableLambda extends Runnable< } } } + const childConfig = patchConfig(config, { + callbacks: runManager?.getChild(), + recursionLimit: (config?.recursionLimit ?? DEFAULT_RECURSION_LIMIT) - 1, + }); const output = await new Promise( (resolve, reject) => { void AsyncLocalStorageProviderSingleton.getInstance().run( - config, + childConfig, async () => { try { const res = await this.func(finalChunk as RunInput, { - ...config, - config, + ...childConfig, + config: childConfig, }); resolve(res); } catch (e) { @@ -2340,23 +2341,19 @@ export class RunnableLambda extends Runnable< if (config?.recursionLimit === 0) { throw new Error("Recursion limit reached."); } - const stream = await output.stream( - finalChunk as RunInput, - patchConfig(config, { - callbacks: runManager?.getChild(), - recursionLimit: - (config?.recursionLimit ?? DEFAULT_RECURSION_LIMIT) - 1, - }) - ); + const stream = await output.stream(finalChunk as RunInput, childConfig); for await (const chunk of stream) { yield chunk; } } else if (isAsyncIterable(output)) { - for await (const chunk of consumeAsyncIterableInContext(config, output)) { + for await (const chunk of consumeAsyncIterableInContext( + childConfig, + output + )) { yield chunk as RunOutput; } } else if (isIterableIterator(output)) { - for (const chunk of consumeIteratorInContext(config, output)) { + for (const chunk of consumeIteratorInContext(childConfig, output)) { yield chunk as RunOutput; } } else { @@ -2433,14 +2430,9 @@ export class RunnableWithFallbacks extends Runnable< input: RunInput, options?: Partial ): Promise { - const callbackManager_ = await CallbackManager.configure( - options?.callbacks, - undefined, - options?.tags, - undefined, - options?.metadata - ); - const { runId, ...otherOptions } = options ?? {}; + const config = ensureConfig(options); + const callbackManager_ = await getCallbackManagerForConfig(options); + const { runId, ...otherConfigFields } = config; const runManager = await callbackManager_?.handleChainStart( this.toJSON(), _coerceToDict(input, "input"), @@ -2448,14 +2440,14 @@ export class RunnableWithFallbacks extends Runnable< undefined, undefined, undefined, - otherOptions?.runName + otherConfigFields?.runName ); let firstError; for (const runnable of this.runnables()) { try { const output = await runnable.invoke( input, - patchConfig(otherOptions, { callbacks: runManager?.getChild() }) + patchConfig(otherConfigFields, { callbacks: runManager?.getChild() }) ); await runManager?.handleChainEnd(_coerceToDict(output, "output")); return output; @@ -2500,15 +2492,7 @@ export class RunnableWithFallbacks extends Runnable< } const configList = this._getOptionsList(options ?? {}, inputs.length); const callbackManagers = await Promise.all( - configList.map((config) => - CallbackManager.configure( - config?.callbacks, - undefined, - config?.tags, - undefined, - config?.metadata - ) - ) + configList.map((config) => getCallbackManagerForConfig(config)) ); const runManagers = await Promise.all( callbackManagers.map(async (callbackManager, i) => { diff --git a/langchain-core/src/runnables/tests/runnable_stream_events_v2.test.ts b/langchain-core/src/runnables/tests/runnable_stream_events_v2.test.ts index b3e51d2b0347..b5ae10c85cd8 100644 --- a/langchain-core/src/runnables/tests/runnable_stream_events_v2.test.ts +++ b/langchain-core/src/runnables/tests/runnable_stream_events_v2.test.ts @@ -1009,7 +1009,7 @@ test("Chat model that supports streaming, but is invoked, should still emit on_s event: "on_chat_model_start", name: "my_model", run_id: expect.any(String), - tags: expect.arrayContaining(["seq:step:2", "my_model", "my_chain"]), + tags: expect.arrayContaining(["my_model", "my_chain"]), metadata: { foo: "bar", a: "b", @@ -1027,7 +1027,7 @@ test("Chat model that supports streaming, but is invoked, should still emit on_s { event: "on_chat_model_stream", run_id: expect.any(String), - tags: expect.arrayContaining(["my_chain", "my_model", "seq:step:2"]), + tags: expect.arrayContaining(["my_chain", "my_model"]), metadata: { a: "b", foo: "bar", @@ -1040,7 +1040,7 @@ test("Chat model that supports streaming, but is invoked, should still emit on_s { event: "on_chat_model_stream", run_id: expect.any(String), - tags: expect.arrayContaining(["my_chain", "my_model", "seq:step:2"]), + tags: expect.arrayContaining(["my_chain", "my_model"]), metadata: { a: "b", foo: "bar", @@ -1053,7 +1053,7 @@ test("Chat model that supports streaming, but is invoked, should still emit on_s { event: "on_chat_model_stream", run_id: expect.any(String), - tags: expect.arrayContaining(["my_chain", "my_model", "seq:step:2"]), + tags: expect.arrayContaining(["my_chain", "my_model"]), metadata: { a: "b", foo: "bar", @@ -1066,7 +1066,7 @@ test("Chat model that supports streaming, but is invoked, should still emit on_s { event: "on_chat_model_stream", run_id: expect.any(String), - tags: expect.arrayContaining(["my_chain", "my_model", "seq:step:2"]), + tags: expect.arrayContaining(["my_chain", "my_model"]), metadata: { a: "b", foo: "bar", @@ -1080,7 +1080,7 @@ test("Chat model that supports streaming, but is invoked, should still emit on_s event: "on_chat_model_end", name: "my_model", run_id: expect.any(String), - tags: expect.arrayContaining(["seq:step:2", "my_model", "my_chain"]), + tags: expect.arrayContaining(["my_model", "my_chain"]), metadata: { foo: "bar", a: "b", @@ -1227,7 +1227,7 @@ test("Chat model that doesn't support streaming, but is invoked, should emit one event: "on_chat_model_start", name: "my_model", run_id: expect.any(String), - tags: expect.arrayContaining(["seq:step:2", "my_model", "my_chain"]), + tags: expect.arrayContaining(["my_model", "my_chain"]), metadata: { foo: "bar", a: "b", @@ -1245,7 +1245,7 @@ test("Chat model that doesn't support streaming, but is invoked, should emit one { event: "on_chat_model_stream", run_id: expect.any(String), - tags: expect.arrayContaining(["my_chain", "my_model", "seq:step:2"]), + tags: expect.arrayContaining(["my_chain", "my_model"]), metadata: { a: "b", foo: "bar", @@ -1259,7 +1259,7 @@ test("Chat model that doesn't support streaming, but is invoked, should emit one event: "on_chat_model_end", name: "my_model", run_id: expect.any(String), - tags: expect.arrayContaining(["seq:step:2", "my_model", "my_chain"]), + tags: expect.arrayContaining(["my_model", "my_chain"]), metadata: { foo: "bar", a: "b", @@ -1417,7 +1417,7 @@ test("LLM that supports streaming, but is invoked, should still emit on_stream e }, }, name: "my_model", - tags: ["seq:step:2", "my_model", "my_chain"], + tags: ["my_model", "my_chain"], run_id: expect.any(String), metadata: { foo: "bar", @@ -1433,7 +1433,7 @@ test("LLM that supports streaming, but is invoked, should still emit on_stream e }, run_id: expect.any(String), name: "my_model", - tags: ["seq:step:2", "my_model", "my_chain"], + tags: ["my_model", "my_chain"], metadata: { foo: "bar", a: "b", @@ -1448,7 +1448,7 @@ test("LLM that supports streaming, but is invoked, should still emit on_stream e }, run_id: expect.any(String), name: "my_model", - tags: ["seq:step:2", "my_model", "my_chain"], + tags: ["my_model", "my_chain"], metadata: { foo: "bar", a: "b", @@ -1463,7 +1463,7 @@ test("LLM that supports streaming, but is invoked, should still emit on_stream e }, run_id: expect.any(String), name: "my_model", - tags: ["seq:step:2", "my_model", "my_chain"], + tags: ["my_model", "my_chain"], metadata: { foo: "bar", a: "b", @@ -1478,7 +1478,7 @@ test("LLM that supports streaming, but is invoked, should still emit on_stream e }, run_id: expect.any(String), name: "my_model", - tags: ["seq:step:2", "my_model", "my_chain"], + tags: ["my_model", "my_chain"], metadata: { foo: "bar", a: "b", @@ -1504,7 +1504,7 @@ test("LLM that supports streaming, but is invoked, should still emit on_stream e }, run_id: expect.any(String), name: "my_model", - tags: ["seq:step:2", "my_model", "my_chain"], + tags: ["my_model", "my_chain"], metadata: { foo: "bar", a: "b", @@ -1654,7 +1654,7 @@ test("LLM that doesn't support streaming, but is invoked, should emit one on_str }, }, name: "my_model", - tags: ["seq:step:2", "my_model", "my_chain"], + tags: ["my_model", "my_chain"], run_id: expect.any(String), metadata: { foo: "bar", @@ -1670,7 +1670,7 @@ test("LLM that doesn't support streaming, but is invoked, should emit one on_str }, run_id: expect.any(String), name: "my_model", - tags: ["seq:step:2", "my_model", "my_chain"], + tags: ["my_model", "my_chain"], metadata: { foo: "bar", a: "b", @@ -1696,7 +1696,7 @@ test("LLM that doesn't support streaming, but is invoked, should emit one on_str }, run_id: expect.any(String), name: "my_model", - tags: ["seq:step:2", "my_model", "my_chain"], + tags: ["my_model", "my_chain"], metadata: { foo: "bar", a: "b", diff --git a/langchain-core/src/singletons/tests/async_local_storage.test.ts b/langchain-core/src/singletons/tests/async_local_storage.test.ts index 850443f4d520..a0068d852949 100644 --- a/langchain-core/src/singletons/tests/async_local_storage.test.ts +++ b/langchain-core/src/singletons/tests/async_local_storage.test.ts @@ -1,8 +1,10 @@ import { test, expect } from "@jest/globals"; +import { v4 } from "uuid"; import { AsyncLocalStorage } from "node:async_hooks"; import { AsyncLocalStorageProviderSingleton } from "../index.js"; import { RunnableLambda } from "../../runnables/base.js"; import { FakeListChatModel } from "../../utils/testing/index.js"; +import { getCallbackManagerForConfig } from "../../runnables/config.js"; test("Config should be automatically populated after setting global async local storage", async () => { const inner = RunnableLambda.from((_, config) => config); @@ -136,10 +138,16 @@ test("Runnable streamEvents method with streaming nested in a RunnableLambda", a AsyncLocalStorageProviderSingleton.initializeGlobalInstance( new AsyncLocalStorage() ); + const asyncLocalStorage = AsyncLocalStorageProviderSingleton.getInstance(); const chat = new FakeListChatModel({ responses: ["Hello"], }); + const outerRunId = v4(); const myFunc = async (input: string) => { + const outerCallbackManager = await getCallbackManagerForConfig( + asyncLocalStorage.getStore() + ); + expect(outerCallbackManager?.getParentRunId()).toEqual(outerRunId); for await (const _ of await chat.stream(input)) { // no-op } @@ -150,8 +158,8 @@ test("Runnable streamEvents method with streaming nested in a RunnableLambda", a const events = []; for await (const event of myNestedLambda.streamEvents("hello", { version: "v1", + runId: outerRunId, })) { - console.log(event); events.push(event); } const chatModelStreamEvent = events.find((event) => { diff --git a/langchain-core/src/tracers/initialize.ts b/langchain-core/src/tracers/initialize.ts index e956b5075aa1..fff2eaa9f261 100644 --- a/langchain-core/src/tracers/initialize.ts +++ b/langchain-core/src/tracers/initialize.ts @@ -23,6 +23,8 @@ export async function getTracingCallbackHandler( } /** + * @deprecated Instantiate directly using the LangChainTracer constructor. + * * Function that returns an instance of `LangChainTracer`. It does not * load any session data. * @returns An instance of `LangChainTracer`. diff --git a/langchain-core/src/utils/beta_warning.ts b/langchain-core/src/utils/beta_warning.ts deleted file mode 100644 index 20fb443295da..000000000000 --- a/langchain-core/src/utils/beta_warning.ts +++ /dev/null @@ -1,9 +0,0 @@ -/** - * Util function for logging a warning when a method is called. - * @param {string} func The name of the function that is in beta. - */ -export function betaWarning(func: string) { - console.warn( - `The function '${func}' is in beta. It is actively being worked on, so the API may change.` - ); -} diff --git a/langchain-core/src/utils/callbacks.ts b/langchain-core/src/utils/callbacks.ts new file mode 100644 index 000000000000..920c75b633ae --- /dev/null +++ b/langchain-core/src/utils/callbacks.ts @@ -0,0 +1,14 @@ +import { getEnvironmentVariable } from "./env.js"; + +export const isTracingEnabled = (tracingEnabled?: boolean): boolean => { + if (tracingEnabled !== undefined) { + return tracingEnabled; + } + const envVars = [ + "LANGSMITH_TRACING_V2", + "LANGCHAIN_TRACING_V2", + "LANGSMITH_TRACING", + "LANGCHAIN_TRACING", + ]; + return !!envVars.find((envVar) => getEnvironmentVariable(envVar) === "true"); +};