From c2f98188a12b554860b77193df74766cfb5698a6 Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Wed, 31 Jul 2024 17:52:56 -0700 Subject: [PATCH 01/16] core: Add signal/timeout options to RunnableConfig - Handled by all built-in runnables - Handled by all utility methods in base runnable, which should propagate to basically all runnables --- langchain-core/src/language_models/base.ts | 12 -- .../src/language_models/chat_models.ts | 18 +-- langchain-core/src/language_models/llms.ts | 18 +-- langchain-core/src/runnables/base.ts | 110 ++++++++++++++++-- langchain-core/src/runnables/config.ts | 24 ++++ langchain-core/src/runnables/remote.ts | 91 +++++++++------ langchain-core/src/runnables/types.ts | 12 ++ langchain-core/src/utils/stream.ts | 25 +++- 8 files changed, 231 insertions(+), 79 deletions(-) diff --git a/langchain-core/src/language_models/base.ts b/langchain-core/src/language_models/base.ts index 8adea3e83b4e..0e8af1bc32bf 100644 --- a/langchain-core/src/language_models/base.ts +++ b/langchain-core/src/language_models/base.ts @@ -207,18 +207,6 @@ export interface BaseLanguageModelCallOptions extends RunnableConfig { * If not provided, the default stop tokens for the model will be used. */ stop?: string[]; - - /** - * Timeout for this call in milliseconds. - */ - timeout?: number; - - /** - * Abort signal for this call. - * If provided, the call will be aborted when the signal is aborted. - * @see https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal - */ - signal?: AbortSignal; } export interface FunctionDefinition { diff --git a/langchain-core/src/language_models/chat_models.ts b/langchain-core/src/language_models/chat_models.ts index 30256073c33a..b60baecb3a20 100644 --- a/langchain-core/src/language_models/chat_models.ts +++ b/langchain-core/src/language_models/chat_models.ts @@ -145,7 +145,7 @@ export abstract class BaseChatModel< > extends BaseLanguageModel { declare ParsedCallOptions: Omit< CallOptions, - keyof RunnableConfig & "timeout" + Exclude >; // Only ever instantiated in main LangChain @@ -159,14 +159,13 @@ export abstract class BaseChatModel< ...llmOutputs: LLMResult["llmOutput"][] ): LLMResult["llmOutput"]; - protected _separateRunnableConfigFromCallOptions( + protected _separateRunnableConfigFromCallOptionsCompat( options?: Partial ): [RunnableConfig, this["ParsedCallOptions"]] { + // For backwards compat, keep `signal` in both runnableConfig and callOptions const [runnableConfig, callOptions] = super._separateRunnableConfigFromCallOptions(options); - if (callOptions?.timeout && !callOptions.signal) { - callOptions.signal = AbortSignal.timeout(callOptions.timeout); - } + (callOptions as this["ParsedCallOptions"]).signal = runnableConfig.signal; return [runnableConfig, callOptions as this["ParsedCallOptions"]]; } @@ -232,7 +231,7 @@ export abstract class BaseChatModel< const prompt = BaseChatModel._convertInputToPromptValue(input); const messages = prompt.toChatMessages(); const [runnableConfig, callOptions] = - this._separateRunnableConfigFromCallOptions(options); + this._separateRunnableConfigFromCallOptionsCompat(options); const inheritableMetadata = { ...runnableConfig.metadata, @@ -578,7 +577,7 @@ export abstract class BaseChatModel< ); const [runnableConfig, callOptions] = - this._separateRunnableConfigFromCallOptions(parsedOptions); + this._separateRunnableConfigFromCallOptionsCompat(parsedOptions); runnableConfig.callbacks = runnableConfig.callbacks ?? callbacks; if (!this.cache) { @@ -586,8 +585,9 @@ export abstract class BaseChatModel< } const { cache } = this; - const llmStringKey = - this._getSerializedCacheKeyParametersForCall(callOptions); + const llmStringKey = this._getSerializedCacheKeyParametersForCall( + callOptions as CallOptions + ); const { generations, missingPromptIndices } = await this._generateCached({ messages: baseMessages, diff --git a/langchain-core/src/language_models/llms.ts b/langchain-core/src/language_models/llms.ts index 20b0e812deb7..f6df3677dfc1 100644 --- a/langchain-core/src/language_models/llms.ts +++ b/langchain-core/src/language_models/llms.ts @@ -63,7 +63,7 @@ export abstract class BaseLLM< > extends BaseLanguageModel { declare ParsedCallOptions: Omit< CallOptions, - keyof RunnableConfig & "timeout" + Exclude >; // Only ever instantiated in main LangChain @@ -103,14 +103,13 @@ export abstract class BaseLLM< throw new Error("Not implemented."); } - protected _separateRunnableConfigFromCallOptions( + protected _separateRunnableConfigFromCallOptionsCompat( options?: Partial ): [RunnableConfig, this["ParsedCallOptions"]] { + // For backwards compat, keep `signal` in both runnableConfig and callOptions const [runnableConfig, callOptions] = super._separateRunnableConfigFromCallOptions(options); - if (callOptions?.timeout && !callOptions.signal) { - callOptions.signal = AbortSignal.timeout(callOptions.timeout); - } + (callOptions as this["ParsedCallOptions"]).signal = runnableConfig.signal; return [runnableConfig, callOptions as this["ParsedCallOptions"]]; } @@ -126,7 +125,7 @@ export abstract class BaseLLM< } else { const prompt = BaseLLM._convertInputToPromptValue(input); const [runnableConfig, callOptions] = - this._separateRunnableConfigFromCallOptions(options); + this._separateRunnableConfigFromCallOptionsCompat(options); const callbackManager_ = await CallbackManager.configure( runnableConfig.callbacks, this.callbacks, @@ -461,7 +460,7 @@ export abstract class BaseLLM< } const [runnableConfig, callOptions] = - this._separateRunnableConfigFromCallOptions(parsedOptions); + this._separateRunnableConfigFromCallOptionsCompat(parsedOptions); runnableConfig.callbacks = runnableConfig.callbacks ?? callbacks; if (!this.cache) { @@ -469,8 +468,9 @@ export abstract class BaseLLM< } const { cache } = this; - const llmStringKey = - this._getSerializedCacheKeyParametersForCall(callOptions); + const llmStringKey = this._getSerializedCacheKeyParametersForCall( + callOptions as CallOptions + ); const { generations, missingPromptIndices } = await this._generateCached({ prompts, cache, diff --git a/langchain-core/src/runnables/base.ts b/langchain-core/src/runnables/base.ts index 4cb65f379f22..ba19752c8f10 100644 --- a/langchain-core/src/runnables/base.ts +++ b/langchain-core/src/runnables/base.ts @@ -339,6 +339,8 @@ export abstract class Runnable< recursionLimit: options.recursionLimit, maxConcurrency: options.maxConcurrency, runId: options.runId, + timeout: options.timeout, + signal: options.signal, }); } const callOptions = { ...(options as Partial) }; @@ -350,6 +352,8 @@ export abstract class Runnable< delete callOptions.recursionLimit; delete callOptions.maxConcurrency; delete callOptions.runId; + delete callOptions.timeout; + delete callOptions.signal; return [runnableConfig, callOptions]; } @@ -378,7 +382,17 @@ export abstract class Runnable< delete config.runId; let output; try { - output = await func.call(this, input, config, runManager); + const promise = func.call(this, input, config, runManager); + output = options?.signal + ? await Promise.race([ + promise, + new Promise((_, reject) => { + options.signal?.addEventListener("abort", () => { + reject(new Error("AbortError")); + }); + }), + ]) + : await promise; } catch (e) { await runManager?.handleChainError(e); throw e; @@ -430,13 +444,23 @@ export abstract class Runnable< ); let outputs: (RunOutput | Error)[]; try { - outputs = await func.call( + const promise = func.call( this, inputs, optionsList, runManagers, batchOptions ); + outputs = optionsList?.[0]?.signal + ? await Promise.race([ + promise, + new Promise((_, reject) => { + optionsList?.[0]?.signal?.addEventListener("abort", () => { + reject(new Error("AbortError")); + }); + }), + ]) + : await promise; } catch (e) { await Promise.all( runManagers.map((runManager) => runManager?.handleChainError(e)) @@ -509,6 +533,7 @@ export abstract class Runnable< undefined, config.runName ?? this.getName() ), + options?.signal, config ); delete config.runId; @@ -1750,14 +1775,27 @@ export class RunnableSequence< const initialSteps = [this.first, ...this.middle]; for (let i = 0; i < initialSteps.length; i += 1) { const step = initialSteps[i]; - nextStepInput = await step.invoke( + const promise = step.invoke( nextStepInput, patchConfig(config, { callbacks: runManager?.getChild(`seq:step:${i + 1}`), }) ); + nextStepInput = options?.signal + ? await Promise.race([ + promise, + new Promise((_, reject) => { + options.signal?.addEventListener("abort", () => + reject(new Error("Aborted")) + ); + }), + ]) + : await promise; } // TypeScript can't detect that the last output of the sequence returns RunOutput, so call it out of the loop here + if (options?.signal?.aborted) { + throw new Error("Aborted"); + } finalOutput = await this.last.invoke( nextStepInput, patchConfig(config, { @@ -1819,7 +1857,7 @@ export class RunnableSequence< try { for (let i = 0; i < this.steps.length; i += 1) { const step = this.steps[i]; - nextStepInputs = await step.batch( + const promise = step.batch( nextStepInputs, runManagers.map((runManager, j) => { const childRunManager = runManager?.getChild(`seq:step:${i + 1}`); @@ -1827,6 +1865,16 @@ export class RunnableSequence< }), batchOptions ); + nextStepInputs = configList[0]?.signal + ? await Promise.race([ + promise, + new Promise((_, reject) => { + configList[0]?.signal?.addEventListener("abort", () => + reject(new Error("Aborted")) + ); + }), + ]) + : await promise; } } catch (e) { await Promise.all( @@ -1880,6 +1928,7 @@ export class RunnableSequence< ); } for await (const chunk of finalGenerator) { + options?.signal?.throwIfAborted(); yield chunk; if (concatSupported) { if (finalOutput === undefined) { @@ -2058,16 +2107,26 @@ export class RunnableMap< // eslint-disable-next-line @typescript-eslint/no-explicit-any const output: Record = {}; try { - await Promise.all( - Object.entries(this.steps).map(async ([key, runnable]) => { + const promises = Object.entries(this.steps).map( + async ([key, runnable]) => { output[key] = await runnable.invoke( input, patchConfig(config, { callbacks: runManager?.getChild(`map:key:${key}`), }) ); - }) + } ); + if (options?.signal) { + promises.push( + new Promise((_, reject) => { + options.signal?.addEventListener("abort", () => + reject(new Error("Aborted")) + ); + }) + ); + } + await Promise.all(promises); } catch (e) { await runManager?.handleChainError(e); throw e; @@ -2101,7 +2160,17 @@ export class RunnableMap< // starting new iterations as needed, // until all iterators are done while (tasks.size) { - const { key, result, gen } = await Promise.race(tasks.values()); + const promise = Promise.race(tasks.values()); + const { key, result, gen } = options?.signal + ? await Promise.race([ + promise, + new Promise((_, reject) => { + options.signal?.addEventListener("abort", () => + reject(new Error("Aborted")) + ); + }), + ]) + : await promise; tasks.delete(key); if (!result.done) { yield { [key]: result.value } as unknown as RunOutput; @@ -2172,21 +2241,33 @@ export class RunnableTraceable extends Runnable< async invoke(input: RunInput, options?: Partial) { const [config] = this._getOptionsList(options ?? {}, 1); const callbacks = await getCallbackManagerForConfig(config); - - return (await this.func( + const promise = this.func( patchConfig(config, { callbacks }), input - )) as RunOutput; + ) as Promise; + + return config?.signal + ? Promise.race([ + promise, + new Promise((_, reject) => { + config.signal?.addEventListener("abort", () => + reject(new Error("Aborted")) + ); + }), + ]) + : await promise; } async *_streamIterator( input: RunInput, options?: Partial ): AsyncGenerator { + const [config] = this._getOptionsList(options ?? {}, 1); const result = await this.invoke(input, options); if (isAsyncIterable(result)) { for await (const item of result) { + config?.signal?.throwIfAborted(); yield item as RunOutput; } return; @@ -2194,6 +2275,7 @@ export class RunnableTraceable extends Runnable< if (isIterator(result)) { while (true) { + config?.signal?.throwIfAborted(); const state: IteratorResult = result.next(); if (state.done) break; yield state.value as RunOutput; @@ -2320,6 +2402,7 @@ export class RunnableLambda extends Runnable< childConfig, output )) { + config?.signal?.throwIfAborted(); if (finalOutput === undefined) { finalOutput = chunk as RunOutput; } else { @@ -2339,6 +2422,7 @@ export class RunnableLambda extends Runnable< childConfig, output )) { + config?.signal?.throwIfAborted(); if (finalOutput === undefined) { finalOutput = chunk as RunOutput; } else { @@ -2423,10 +2507,12 @@ export class RunnableLambda extends Runnable< childConfig, output )) { + config?.signal?.throwIfAborted(); yield chunk as RunOutput; } } else if (isIterableIterator(output)) { for (const chunk of consumeIteratorInContext(childConfig, output)) { + config?.signal?.throwIfAborted(); yield chunk as RunOutput; } } else { @@ -2517,6 +2603,7 @@ export class RunnableWithFallbacks extends Runnable< ); let firstError; for (const runnable of this.runnables()) { + config?.signal?.throwIfAborted(); try { const output = await runnable.invoke( input, @@ -2586,6 +2673,7 @@ export class RunnableWithFallbacks extends Runnable< // eslint-disable-next-line @typescript-eslint/no-explicit-any let firstError: any; for (const runnable of this.runnables()) { + configList[0].signal?.throwIfAborted(); try { const outputs = await runnable.batch( inputs, diff --git a/langchain-core/src/runnables/config.ts b/langchain-core/src/runnables/config.ts index 409d556eac8d..04687ee93a1d 100644 --- a/langchain-core/src/runnables/config.ts +++ b/langchain-core/src/runnables/config.ts @@ -31,6 +31,18 @@ export function mergeConfigs( copy[key] = [...new Set(baseKeys.concat(options[key] ?? []))]; } else if (key === "configurable") { copy[key] = { ...copy[key], ...options[key] }; + } else if (key === "timeout") { + if (copy.timeout === undefined) { + copy.timeout = options.timeout; + } else if (options.timeout !== undefined) { + copy.timeout = Math.min(copy.timeout, options.timeout); + } + } else if (key === "signal") { + if (copy.signal === undefined) { + copy.signal = options.signal; + } else if (options.signal !== undefined) { + copy.signal = AbortSignal.any([copy.signal, options.signal]); + } } else if (key === "callbacks") { const baseCallbacks = copy.callbacks; const providedCallbacks = options.callbacks; @@ -155,6 +167,18 @@ export function ensureConfig( } } } + if (empty.timeout !== undefined) { + if (empty.timeout <= 0) { + throw new Error("Timeout must be a positive number"); + } + const timeoutSignal = AbortSignal.timeout(empty.timeout); + if (empty.signal !== undefined) { + empty.signal = AbortSignal.any([empty.signal, timeoutSignal]); + } else { + empty.signal = timeoutSignal; + } + delete empty.timeout; + } return empty as CallOptions; } diff --git a/langchain-core/src/runnables/remote.ts b/langchain-core/src/runnables/remote.ts index 9ecd597556f9..dc08c731a501 100644 --- a/langchain-core/src/runnables/remote.ts +++ b/langchain-core/src/runnables/remote.ts @@ -214,11 +214,12 @@ function deserialize(str: string): RunOutput { return revive(obj); } -function removeCallbacks( +function removeCallbacksAndSignal( options?: RunnableConfig -): Omit { +): Omit { const rest = { ...options }; delete rest.callbacks; + delete rest.signal; return rest; } @@ -276,7 +277,7 @@ export class RemoteRunnable< this.options = options; } - private async post(path: string, body: Body) { + private async post(path: string, body: Body, signal?: AbortSignal) { return fetch(`${this.url}${path}`, { method: "POST", body: JSON.stringify(serialize(body)), @@ -284,7 +285,7 @@ export class RemoteRunnable< "Content-Type": "application/json", ...this.options?.headers, }, - signal: AbortSignal.timeout(this.options?.timeout ?? 60000), + signal: signal ?? AbortSignal.timeout(this.options?.timeout ?? 60000), }); } @@ -299,11 +300,15 @@ export class RemoteRunnable< input: RunInput; config?: RunnableConfig; kwargs?: Omit, keyof RunnableConfig>; - }>("/invoke", { - input, - config: removeCallbacks(config), - kwargs: kwargs ?? {}, - }); + }>( + "/invoke", + { + input, + config: removeCallbacksAndSignal(config), + kwargs: kwargs ?? {}, + }, + config.signal + ); if (!response.ok) { throw new Error(`${response.status} Error: ${await response.text()}`); } @@ -347,13 +352,17 @@ export class RemoteRunnable< inputs: RunInput[]; config?: (RunnableConfig & RunnableBatchOptions)[]; kwargs?: Omit, keyof RunnableConfig>[]; - }>("/batch", { - inputs, - config: (configs ?? []) - .map(removeCallbacks) - .map((config) => ({ ...config, ...batchOptions })), - kwargs, - }); + }>( + "/batch", + { + inputs, + config: (configs ?? []) + .map(removeCallbacksAndSignal) + .map((config) => ({ ...config, ...batchOptions })), + kwargs, + }, + options?.[0]?.signal + ); if (!response.ok) { throw new Error(`${response.status} Error: ${await response.text()}`); } @@ -422,11 +431,15 @@ export class RemoteRunnable< input: RunInput; config?: RunnableConfig; kwargs?: Omit, keyof RunnableConfig>; - }>("/stream", { - input, - config: removeCallbacks(config), - kwargs, - }); + }>( + "/stream", + { + input, + config: removeCallbacksAndSignal(config), + kwargs, + }, + config.signal + ); if (!response.ok) { const json = await response.json(); const error = new Error( @@ -502,13 +515,17 @@ export class RemoteRunnable< config?: RunnableConfig; kwargs?: Omit, keyof RunnableConfig>; diff: false; - }>("/stream_log", { - input, - config: removeCallbacks(config), - kwargs, - ...camelCaseStreamOptions, - diff: false, - }); + }>( + "/stream_log", + { + input, + config: removeCallbacksAndSignal(config), + kwargs, + ...camelCaseStreamOptions, + diff: false, + }, + config.signal + ); const { body, ok } = response; if (!ok) { throw new Error(`${response.status} Error: ${await response.text()}`); @@ -574,13 +591,17 @@ export class RemoteRunnable< config?: RunnableConfig; kwargs?: Omit, keyof RunnableConfig>; diff: false; - }>("/stream_events", { - input, - config: removeCallbacks(config), - kwargs, - ...camelCaseStreamOptions, - diff: false, - }); + }>( + "/stream_events", + { + input, + config: removeCallbacksAndSignal(config), + kwargs, + ...camelCaseStreamOptions, + diff: false, + }, + config.signal + ); const { body, ok } = response; if (!ok) { throw new Error(`${response.status} Error: ${await response.text()}`); diff --git a/langchain-core/src/runnables/types.ts b/langchain-core/src/runnables/types.ts index 569e8aa26c0e..e7ddfa8c3852 100644 --- a/langchain-core/src/runnables/types.ts +++ b/langchain-core/src/runnables/types.ts @@ -89,4 +89,16 @@ export interface RunnableConfig extends BaseCallbackConfig { /** Maximum number of parallel calls to make. */ maxConcurrency?: number; + + /** + * Timeout for this call in milliseconds. + */ + timeout?: number; + + /** + * Abort signal for this call. + * If provided, the call will be aborted when the signal is aborted. + * @see https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal + */ + signal?: AbortSignal; } diff --git a/langchain-core/src/utils/stream.ts b/langchain-core/src/utils/stream.ts index 234cec3b900f..31ac8907b1c4 100644 --- a/langchain-core/src/utils/stream.ts +++ b/langchain-core/src/utils/stream.ts @@ -186,6 +186,8 @@ export class AsyncGeneratorWithSetup< public config?: unknown; + public signal?: AbortSignal; + private firstResult: Promise>; private firstResultUsed = false; @@ -194,9 +196,11 @@ export class AsyncGeneratorWithSetup< generator: AsyncGenerator; startSetup?: () => Promise; config?: unknown; + signal?: AbortSignal; }) { this.generator = params.generator; this.config = params.config; + this.signal = params.signal; // setup is a promise that resolves only after the first iterator value // is available. this is useful when setup of several piped generators // needs to happen in logical order, ie. in the order in which input to @@ -218,6 +222,8 @@ export class AsyncGeneratorWithSetup< } async next(...args: [] | [TNext]): Promise> { + this.signal?.throwIfAborted(); + if (!this.firstResultUsed) { this.firstResultUsed = true; return this.firstResult; @@ -225,9 +231,20 @@ export class AsyncGeneratorWithSetup< return AsyncLocalStorageProviderSingleton.runWithConfig( this.config, - async () => { - return this.generator.next(...args); - }, + this.signal + ? async () => { + return Promise.race([ + this.generator.next(...args), + new Promise((_resolve, reject) => { + this.signal?.addEventListener("abort", () => { + reject(new Error("Aborted")); + }); + }), + ]); + } + : async () => { + return this.generator.next(...args); + }, true ); } @@ -264,11 +281,13 @@ export async function pipeGeneratorWithSetup< ) => AsyncGenerator, generator: AsyncGenerator, startSetup: () => Promise, + signal: AbortSignal | undefined, ...args: A ) { const gen = new AsyncGeneratorWithSetup({ generator, startSetup, + signal, }); const setup = await gen.setup; return { output: to(gen, setup, ...args), setup }; From 9a4fc4be950ad2b65ff0ad47377a09b60dd93f20 Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Wed, 31 Jul 2024 17:56:39 -0700 Subject: [PATCH 02/16] Lint --- langchain-core/src/runnables/config.ts | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/langchain-core/src/runnables/config.ts b/langchain-core/src/runnables/config.ts index 04687ee93a1d..247c08af3805 100644 --- a/langchain-core/src/runnables/config.ts +++ b/langchain-core/src/runnables/config.ts @@ -41,7 +41,11 @@ export function mergeConfigs( if (copy.signal === undefined) { copy.signal = options.signal; } else if (options.signal !== undefined) { - copy.signal = AbortSignal.any([copy.signal, options.signal]); + if ("any" in AbortSignal) { + copy.signal = AbortSignal.any([copy.signal, options.signal]); + } else { + copy.signal = options.signal; + } } } else if (key === "callbacks") { const baseCallbacks = copy.callbacks; @@ -173,7 +177,9 @@ export function ensureConfig( } const timeoutSignal = AbortSignal.timeout(empty.timeout); if (empty.signal !== undefined) { - empty.signal = AbortSignal.any([empty.signal, timeoutSignal]); + if ("any" in AbortSignal) { + empty.signal = AbortSignal.any([empty.signal, timeoutSignal]); + } } else { empty.signal = timeoutSignal; } From 52e38d8555103dfeec7ed35cd30b2597abe4d871 Mon Sep 17 00:00:00 2001 From: jacoblee93 Date: Wed, 31 Jul 2024 22:17:28 -0700 Subject: [PATCH 03/16] Fix build --- langchain-core/src/runnables/config.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/langchain-core/src/runnables/config.ts b/langchain-core/src/runnables/config.ts index 247c08af3805..08d9d7366d2c 100644 --- a/langchain-core/src/runnables/config.ts +++ b/langchain-core/src/runnables/config.ts @@ -42,7 +42,8 @@ export function mergeConfigs( copy.signal = options.signal; } else if (options.signal !== undefined) { if ("any" in AbortSignal) { - copy.signal = AbortSignal.any([copy.signal, options.signal]); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + copy.signal = (AbortSignal as any).any([copy.signal, options.signal]); } else { copy.signal = options.signal; } @@ -178,7 +179,8 @@ export function ensureConfig( const timeoutSignal = AbortSignal.timeout(empty.timeout); if (empty.signal !== undefined) { if ("any" in AbortSignal) { - empty.signal = AbortSignal.any([empty.signal, timeoutSignal]); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + empty.signal = (AbortSignal as any).any([empty.signal, timeoutSignal]); } } else { empty.signal = timeoutSignal; From 5cb5cfcaad00ae14c4ed765f78913d11a2edc64e Mon Sep 17 00:00:00 2001 From: jacoblee93 Date: Wed, 31 Jul 2024 22:37:13 -0700 Subject: [PATCH 04/16] Refactor race logic into a util --- langchain-core/src/runnables/base.ts | 70 ++++---------------------- langchain-core/src/runnables/config.ts | 5 +- langchain-core/src/utils/signal.ts | 17 +++++++ langchain-core/src/utils/stream.ts | 10 +--- 4 files changed, 33 insertions(+), 69 deletions(-) create mode 100644 langchain-core/src/utils/signal.ts diff --git a/langchain-core/src/runnables/base.ts b/langchain-core/src/runnables/base.ts index ba19752c8f10..7732812559f0 100644 --- a/langchain-core/src/runnables/base.ts +++ b/langchain-core/src/runnables/base.ts @@ -30,6 +30,7 @@ import { pipeGeneratorWithSetup, AsyncGeneratorWithSetup, } from "../utils/stream.js"; +import { raceWithSignal } from "../utils/signal.js"; import { DEFAULT_RECURSION_LIMIT, RunnableConfig, @@ -383,16 +384,7 @@ export abstract class Runnable< let output; try { const promise = func.call(this, input, config, runManager); - output = options?.signal - ? await Promise.race([ - promise, - new Promise((_, reject) => { - options.signal?.addEventListener("abort", () => { - reject(new Error("AbortError")); - }); - }), - ]) - : await promise; + output = await raceWithSignal(promise, options?.signal); } catch (e) { await runManager?.handleChainError(e); throw e; @@ -451,16 +443,7 @@ export abstract class Runnable< runManagers, batchOptions ); - outputs = optionsList?.[0]?.signal - ? await Promise.race([ - promise, - new Promise((_, reject) => { - optionsList?.[0]?.signal?.addEventListener("abort", () => { - reject(new Error("AbortError")); - }); - }), - ]) - : await promise; + outputs = await raceWithSignal(promise, optionsList?.[0]?.signal); } catch (e) { await Promise.all( runManagers.map((runManager) => runManager?.handleChainError(e)) @@ -1781,16 +1764,7 @@ export class RunnableSequence< callbacks: runManager?.getChild(`seq:step:${i + 1}`), }) ); - nextStepInput = options?.signal - ? await Promise.race([ - promise, - new Promise((_, reject) => { - options.signal?.addEventListener("abort", () => - reject(new Error("Aborted")) - ); - }), - ]) - : await promise; + nextStepInput = await raceWithSignal(promise, options?.signal); } // TypeScript can't detect that the last output of the sequence returns RunOutput, so call it out of the loop here if (options?.signal?.aborted) { @@ -1865,16 +1839,7 @@ export class RunnableSequence< }), batchOptions ); - nextStepInputs = configList[0]?.signal - ? await Promise.race([ - promise, - new Promise((_, reject) => { - configList[0]?.signal?.addEventListener("abort", () => - reject(new Error("Aborted")) - ); - }), - ]) - : await promise; + nextStepInputs = await raceWithSignal(promise, configList[0]?.signal); } } catch (e) { await Promise.all( @@ -2161,16 +2126,10 @@ export class RunnableMap< // until all iterators are done while (tasks.size) { const promise = Promise.race(tasks.values()); - const { key, result, gen } = options?.signal - ? await Promise.race([ - promise, - new Promise((_, reject) => { - options.signal?.addEventListener("abort", () => - reject(new Error("Aborted")) - ); - }), - ]) - : await promise; + const { key, result, gen } = await raceWithSignal( + promise, + options?.signal + ); tasks.delete(key); if (!result.done) { yield { [key]: result.value } as unknown as RunOutput; @@ -2246,16 +2205,7 @@ export class RunnableTraceable extends Runnable< input ) as Promise; - return config?.signal - ? Promise.race([ - promise, - new Promise((_, reject) => { - config.signal?.addEventListener("abort", () => - reject(new Error("Aborted")) - ); - }), - ]) - : await promise; + return raceWithSignal(promise, config?.signal); } async *_streamIterator( diff --git a/langchain-core/src/runnables/config.ts b/langchain-core/src/runnables/config.ts index 08d9d7366d2c..8fa9a244ee3d 100644 --- a/langchain-core/src/runnables/config.ts +++ b/langchain-core/src/runnables/config.ts @@ -43,7 +43,10 @@ export function mergeConfigs( } else if (options.signal !== undefined) { if ("any" in AbortSignal) { // eslint-disable-next-line @typescript-eslint/no-explicit-any - copy.signal = (AbortSignal as any).any([copy.signal, options.signal]); + copy.signal = (AbortSignal as any).any([ + copy.signal, + options.signal, + ]); } else { copy.signal = options.signal; } diff --git a/langchain-core/src/utils/signal.ts b/langchain-core/src/utils/signal.ts new file mode 100644 index 000000000000..b2f339911033 --- /dev/null +++ b/langchain-core/src/utils/signal.ts @@ -0,0 +1,17 @@ +export async function raceWithSignal( + promise: Promise, + signal?: AbortSignal +): Promise { + if (signal === undefined) { + return promise; + } + if (signal.aborted) { + throw new Error("AbortError"); + } + return Promise.race([ + promise, + new Promise((_, reject) => { + signal.addEventListener("abort", () => reject(new Error("Aborted"))); + }), + ]); +} diff --git a/langchain-core/src/utils/stream.ts b/langchain-core/src/utils/stream.ts index 31ac8907b1c4..f40e997d23fb 100644 --- a/langchain-core/src/utils/stream.ts +++ b/langchain-core/src/utils/stream.ts @@ -1,4 +1,5 @@ import { AsyncLocalStorageProviderSingleton } from "../singletons/index.js"; +import { raceWithSignal } from "./signal.js"; // Make this a type to override ReadableStream's async iterator type in case // the popular web-streams-polyfill is imported - the supplied types @@ -233,14 +234,7 @@ export class AsyncGeneratorWithSetup< this.config, this.signal ? async () => { - return Promise.race([ - this.generator.next(...args), - new Promise((_resolve, reject) => { - this.signal?.addEventListener("abort", () => { - reject(new Error("Aborted")); - }); - }), - ]); + return raceWithSignal(this.generator.next(...args), this.signal); } : async () => { return this.generator.next(...args); From df05fc9b0cd23d29f8ea7bc225e6356e946bad8d Mon Sep 17 00:00:00 2001 From: jacoblee93 Date: Wed, 31 Jul 2024 23:07:28 -0700 Subject: [PATCH 05/16] Relax typing in tests --- .../src/integration_tests/chat_models.ts | 82 ++++++++++--------- 1 file changed, 42 insertions(+), 40 deletions(-) diff --git a/libs/langchain-standard-tests/src/integration_tests/chat_models.ts b/libs/langchain-standard-tests/src/integration_tests/chat_models.ts index 8fff150f1cf5..5fdb651e2a7d 100644 --- a/libs/langchain-standard-tests/src/integration_tests/chat_models.ts +++ b/libs/langchain-standard-tests/src/integration_tests/chat_models.ts @@ -1,3 +1,5 @@ +/* eslint-disable @typescript-eslint/no-explicit-any */ + import { expect } from "@jest/globals"; import { BaseChatModelCallOptions } from "@langchain/core/language_models/chat_models"; import { @@ -112,11 +114,11 @@ export abstract class ChatModelIntegrationTests< * 1. The result is defined and is an instance of the correct type. * 2. The content of the response is a non-empty string. * - * @param {InstanceType["ParsedCallOptions"] | undefined} callOptions Optional call options to pass to the model. + * @param {any | undefined} callOptions Optional call options to pass to the model. * These options will be applied to the model at runtime. */ async testInvoke( - callOptions?: InstanceType["ParsedCallOptions"] + callOptions?: any ) { // Create a new instance of the chat model const chatModel = new this.Cls(this.constructorArgs); @@ -147,11 +149,11 @@ export abstract class ChatModelIntegrationTests< * 2. The content of each token is a string. * 3. The total number of characters streamed is greater than zero. * - * @param {InstanceType["ParsedCallOptions"] | undefined} callOptions Optional call options to pass to the model. + * @param {any | undefined} callOptions Optional call options to pass to the model. * These options will be applied to the model at runtime. */ async testStream( - callOptions?: InstanceType["ParsedCallOptions"] + callOptions?: any ) { const chatModel = new this.Cls(this.constructorArgs); let numChars = 0; @@ -183,11 +185,11 @@ export abstract class ChatModelIntegrationTests< * 2. The number of results matches the number of inputs. * 3. Each result is of the correct type and has non-empty content. * - * @param {InstanceType["ParsedCallOptions"] | undefined} callOptions Optional call options to pass to the model. + * @param {any | undefined} callOptions Optional call options to pass to the model. * These options will be applied to the model at runtime. */ async testBatch( - callOptions?: InstanceType["ParsedCallOptions"] + callOptions?: any ) { const chatModel = new this.Cls(this.constructorArgs); @@ -229,11 +231,11 @@ export abstract class ChatModelIntegrationTests< * * Finally, it verifies the final chunk's `event.data.output` field * matches the concatenated content of all `on_chat_model_stream` events. - * @param {InstanceType["ParsedCallOptions"] | undefined} callOptions Optional call options to pass to the model. + * @param {any | undefined} callOptions Optional call options to pass to the model. * These options will be applied to the model at runtime. */ async testStreamEvents( - callOptions?: InstanceType["ParsedCallOptions"] + callOptions?: any ) { const chatModel = new this.Cls(this.constructorArgs); @@ -300,11 +302,11 @@ export abstract class ChatModelIntegrationTests< * 1. The result is defined and is an instance of the correct response type. * 2. The content of the response is a non-empty string. * - * @param {InstanceType["ParsedCallOptions"] | undefined} callOptions Optional call options to pass to the model. + * @param {any | undefined} callOptions Optional call options to pass to the model. * These options will be applied to the model at runtime. */ async testConversation( - callOptions?: InstanceType["ParsedCallOptions"] + callOptions?: any ) { // Create a new instance of the chat model const chatModel = new this.Cls(this.constructorArgs); @@ -343,11 +345,11 @@ export abstract class ChatModelIntegrationTests< * 3. The `usage_metadata` field contains `input_tokens`, `output_tokens`, and `total_tokens`, * all of which are numbers. * - * @param {InstanceType["ParsedCallOptions"] | undefined} callOptions Optional call options to pass to the model. + * @param {any | undefined} callOptions Optional call options to pass to the model. * These options will be applied to the model at runtime. */ async testUsageMetadata( - callOptions?: InstanceType["ParsedCallOptions"] + callOptions?: any ) { // Create a new instance of the chat model const chatModel = new this.Cls(this.constructorArgs); @@ -393,11 +395,11 @@ export abstract class ChatModelIntegrationTests< * 3. The `usage_metadata` field contains `input_tokens`, `output_tokens`, and `total_tokens`, * all of which are numbers. * - * @param {InstanceType["ParsedCallOptions"] | undefined} callOptions Optional call options to pass to the model. + * @param {any | undefined} callOptions Optional call options to pass to the model. * These options will be applied to the model at runtime. */ async testUsageMetadataStreaming( - callOptions?: InstanceType["ParsedCallOptions"] + callOptions?: any ) { const chatModel = new this.Cls(this.constructorArgs); let finalChunks: AIMessageChunk | undefined; @@ -451,11 +453,11 @@ export abstract class ChatModelIntegrationTests< * This test ensures that the model can correctly process and respond to complex message * histories that include tool calls with string-based content structures. * - * @param {InstanceType["ParsedCallOptions"] | undefined} callOptions Optional call options to pass to the model. + * @param {any | undefined} callOptions Optional call options to pass to the model. * These options will be applied to the model at runtime. */ async testToolMessageHistoriesStringContent( - callOptions?: InstanceType["ParsedCallOptions"] + callOptions?: any ) { // Skip the test if the model doesn't support tool calling if (!this.chatModelHasToolCalling) { @@ -522,10 +524,10 @@ export abstract class ChatModelIntegrationTests< * This test ensures that the model can correctly process and respond to complex message * histories that include tool calls with list-based content structures. * - * @param {InstanceType["ParsedCallOptions"] | undefined} callOptions Optional call options to pass to the model. + * @param {any | undefined} callOptions Optional call options to pass to the model. */ async testToolMessageHistoriesListContent( - callOptions?: InstanceType["ParsedCallOptions"] + callOptions?: any ) { if (!this.chatModelHasToolCalling) { console.log("Test requires tool calling. Skipping..."); @@ -602,11 +604,11 @@ export abstract class ChatModelIntegrationTests< * the patterns demonstrated in few-shot examples, particularly when those * examples involve tool usage. * - * @param {InstanceType["ParsedCallOptions"] | undefined} callOptions Optional call options to pass to the model. + * @param {any | undefined} callOptions Optional call options to pass to the model. * These options will be applied to the model at runtime. */ async testStructuredFewShotExamples( - callOptions?: InstanceType["ParsedCallOptions"] + callOptions?: any ) { // Skip the test if the model doesn't support tool calling if (!this.chatModelHasToolCalling) { @@ -667,11 +669,11 @@ export abstract class ChatModelIntegrationTests< * This test is crucial for ensuring that the model can generate responses * in a specific format, which is useful for tasks requiring structured data output. * - * @param {InstanceType["ParsedCallOptions"] | undefined} callOptions Optional call options to pass to the model. + * @param {any | undefined} callOptions Optional call options to pass to the model. * These options will be applied to the model at runtime. */ async testWithStructuredOutput( - callOptions?: InstanceType["ParsedCallOptions"] + callOptions?: any ) { // Skip the test if the model doesn't support structured output if (!this.chatModelHasStructuredOutput) { @@ -726,11 +728,11 @@ export abstract class ChatModelIntegrationTests< * This test is crucial for ensuring that the model can generate responses in a specific format * while also providing access to the original, unprocessed model output. * - * @param {InstanceType["ParsedCallOptions"] | undefined} callOptions Optional call options to pass to the model. + * @param {any | undefined} callOptions Optional call options to pass to the model. * These options will be applied to the model at runtime. */ async testWithStructuredOutputIncludeRaw( - callOptions?: InstanceType["ParsedCallOptions"] + callOptions?: any ) { // Skip the test if the model doesn't support structured output if (!this.chatModelHasStructuredOutput) { @@ -788,11 +790,11 @@ export abstract class ChatModelIntegrationTests< * This test is crucial for ensuring compatibility with OpenAI's function * calling format, which is a common standard in AI tool integration. * - * @param {InstanceType["ParsedCallOptions"] | undefined} callOptions Optional call options to pass to the model. + * @param {any | undefined} callOptions Optional call options to pass to the model. * These options will be applied to the model at runtime. */ async testBindToolsWithOpenAIFormattedTools( - callOptions?: InstanceType["ParsedCallOptions"] + callOptions?: any ) { // Skip the test if the model doesn't support tool calling if (!this.chatModelHasToolCalling) { @@ -855,11 +857,11 @@ export abstract class ChatModelIntegrationTests< * from Runnable objects, which provides a flexible way to integrate * custom logic into the model's tool-calling capabilities. * - * @param {InstanceType["ParsedCallOptions"] | undefined} callOptions Optional call options to pass to the model. + * @param {any | undefined} callOptions Optional call options to pass to the model. * These options will be applied to the model at runtime. */ async testBindToolsWithRunnableToolLike( - callOptions?: InstanceType["ParsedCallOptions"] + callOptions?: any ) { // Skip the test if the model doesn't support tool calling if (!this.chatModelHasToolCalling) { @@ -923,11 +925,11 @@ export abstract class ChatModelIntegrationTests< * This test is crucial for ensuring that the caching mechanism works correctly * with various message structures, maintaining consistency and efficiency. * - * @param {InstanceType["ParsedCallOptions"] | undefined} callOptions Optional call options to pass to the model. + * @param {any | undefined} callOptions Optional call options to pass to the model. * These options will be applied to the model at runtime. */ async testCacheComplexMessageTypes( - callOptions?: InstanceType["ParsedCallOptions"] + callOptions?: any ) { // Create a new instance of the chat model with caching enabled const model = new this.Cls({ @@ -987,11 +989,11 @@ export abstract class ChatModelIntegrationTests< * 3. The usage metadata is present in the streamed result. * 4. Both input and output tokens are present and greater than zero in the usage metadata. * - * @param {InstanceType["ParsedCallOptions"] | undefined} callOptions Optional call options to pass to the model. + * @param {any | undefined} callOptions Optional call options to pass to the model. * These options will be applied to the model at runtime. */ async testStreamTokensWithToolCalls( - callOptions?: InstanceType["ParsedCallOptions"] + callOptions?: any ) { const model = new this.Cls(this.constructorArgs); if (!model.bindTools) { @@ -1053,11 +1055,11 @@ export abstract class ChatModelIntegrationTests< * 5. Send a followup request including the tool call and response. * 6. Verify the model generates a non-empty final response. * - * @param {InstanceType["ParsedCallOptions"] | undefined} callOptions Optional call options to pass to the model. + * @param {any | undefined} callOptions Optional call options to pass to the model. * These options will be applied to the model at runtime. */ async testModelCanUseToolUseAIMessage( - callOptions?: InstanceType["ParsedCallOptions"] + callOptions?: any ) { if (!this.chatModelHasToolCalling) { console.log("Test requires tool calling. Skipping..."); @@ -1147,11 +1149,11 @@ export abstract class ChatModelIntegrationTests< * 5. Stream a followup request including the tool call and response. * 6. Verify the model generates a non-empty final streamed response. * - * @param {InstanceType["ParsedCallOptions"] | undefined} callOptions Optional call options to pass to the model. + * @param {any | undefined} callOptions Optional call options to pass to the model. * These options will be applied to the model at runtime. */ async testModelCanUseToolUseAIMessageWithStreaming( - callOptions?: InstanceType["ParsedCallOptions"] + callOptions?: any ) { if (!this.chatModelHasToolCalling) { console.log("Test requires tool calling. Skipping..."); @@ -1253,11 +1255,11 @@ export abstract class ChatModelIntegrationTests< * This test is particularly important for ensuring compatibility with APIs * that may not accept JSON schemas with unknown object fields (e.g., Google's API). * - * @param {InstanceType["ParsedCallOptions"] | undefined} callOptions Optional call options to pass to the model. + * @param {any | undefined} callOptions Optional call options to pass to the model. * These options will be applied to the model at runtime. */ async testInvokeMoreComplexTools( - callOptions?: InstanceType["ParsedCallOptions"] + callOptions?: any ) { // Skip the test if the model doesn't support tool calling if (!this.chatModelHasToolCalling) { @@ -1333,11 +1335,11 @@ Extraction path: {extractionPath}`, * It ensures that the model can correctly process and respond to prompts requiring multiple tool calls, * both in streaming and non-streaming contexts, and can handle message histories with parallel tool calls. * - * @param {InstanceType["ParsedCallOptions"] | undefined} callOptions Optional call options to pass to the model. + * @param {any | undefined} callOptions Optional call options to pass to the model. * @param {boolean} onlyVerifyHistory If true, only verifies the message history test. */ async testParallelToolCalling( - callOptions?: InstanceType["ParsedCallOptions"], + callOptions?: any, onlyVerifyHistory = false ) { // Skip the test if the model doesn't support tool calling From 765811916485adfd42dad5b57dadb839aabcdd5a Mon Sep 17 00:00:00 2001 From: jacoblee93 Date: Wed, 31 Jul 2024 23:21:32 -0700 Subject: [PATCH 06/16] Fix types --- libs/langchain-community/src/llms/layerup_security.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/libs/langchain-community/src/llms/layerup_security.ts b/libs/langchain-community/src/llms/layerup_security.ts index e60676094892..0a3c8be61dd8 100644 --- a/libs/langchain-community/src/llms/layerup_security.ts +++ b/libs/langchain-community/src/llms/layerup_security.ts @@ -2,6 +2,7 @@ import { LLM, BaseLLM, type BaseLLMParams, + BaseLLMCallOptions, } from "@langchain/core/language_models/llms"; import { GuardrailResponse, @@ -101,7 +102,7 @@ export class LayerupSecurity extends LLM { return "layerup_security"; } - async _call(input: string, options?: BaseLLMParams): Promise { + async _call(input: string, options?: BaseLLMCallOptions): Promise { // Since LangChain LLMs only support string inputs, we will wrap each call to Layerup in a single-message // array of messages, then extract the string element when we need to access it. let messages: LLMMessage[] = [ From 1be7dfac62e66391a7e551a4a18e2923bd79240f Mon Sep 17 00:00:00 2001 From: jacoblee93 Date: Wed, 31 Jul 2024 23:24:35 -0700 Subject: [PATCH 07/16] Formatting --- .../src/integration_tests/chat_models.ts | 81 +++++-------------- 1 file changed, 20 insertions(+), 61 deletions(-) diff --git a/libs/langchain-standard-tests/src/integration_tests/chat_models.ts b/libs/langchain-standard-tests/src/integration_tests/chat_models.ts index 5fdb651e2a7d..016a85f2810a 100644 --- a/libs/langchain-standard-tests/src/integration_tests/chat_models.ts +++ b/libs/langchain-standard-tests/src/integration_tests/chat_models.ts @@ -117,9 +117,7 @@ export abstract class ChatModelIntegrationTests< * @param {any | undefined} callOptions Optional call options to pass to the model. * These options will be applied to the model at runtime. */ - async testInvoke( - callOptions?: any - ) { + async testInvoke(callOptions?: any) { // Create a new instance of the chat model const chatModel = new this.Cls(this.constructorArgs); @@ -152,9 +150,7 @@ export abstract class ChatModelIntegrationTests< * @param {any | undefined} callOptions Optional call options to pass to the model. * These options will be applied to the model at runtime. */ - async testStream( - callOptions?: any - ) { + async testStream(callOptions?: any) { const chatModel = new this.Cls(this.constructorArgs); let numChars = 0; @@ -188,9 +184,7 @@ export abstract class ChatModelIntegrationTests< * @param {any | undefined} callOptions Optional call options to pass to the model. * These options will be applied to the model at runtime. */ - async testBatch( - callOptions?: any - ) { + async testBatch(callOptions?: any) { const chatModel = new this.Cls(this.constructorArgs); // Process two simple prompts in batch @@ -234,9 +228,7 @@ export abstract class ChatModelIntegrationTests< * @param {any | undefined} callOptions Optional call options to pass to the model. * These options will be applied to the model at runtime. */ - async testStreamEvents( - callOptions?: any - ) { + async testStreamEvents(callOptions?: any) { const chatModel = new this.Cls(this.constructorArgs); const stream = chatModel.streamEvents("Hello", { @@ -305,9 +297,7 @@ export abstract class ChatModelIntegrationTests< * @param {any | undefined} callOptions Optional call options to pass to the model. * These options will be applied to the model at runtime. */ - async testConversation( - callOptions?: any - ) { + async testConversation(callOptions?: any) { // Create a new instance of the chat model const chatModel = new this.Cls(this.constructorArgs); @@ -348,9 +338,7 @@ export abstract class ChatModelIntegrationTests< * @param {any | undefined} callOptions Optional call options to pass to the model. * These options will be applied to the model at runtime. */ - async testUsageMetadata( - callOptions?: any - ) { + async testUsageMetadata(callOptions?: any) { // Create a new instance of the chat model const chatModel = new this.Cls(this.constructorArgs); @@ -398,9 +386,7 @@ export abstract class ChatModelIntegrationTests< * @param {any | undefined} callOptions Optional call options to pass to the model. * These options will be applied to the model at runtime. */ - async testUsageMetadataStreaming( - callOptions?: any - ) { + async testUsageMetadataStreaming(callOptions?: any) { const chatModel = new this.Cls(this.constructorArgs); let finalChunks: AIMessageChunk | undefined; @@ -456,9 +442,7 @@ export abstract class ChatModelIntegrationTests< * @param {any | undefined} callOptions Optional call options to pass to the model. * These options will be applied to the model at runtime. */ - async testToolMessageHistoriesStringContent( - callOptions?: any - ) { + async testToolMessageHistoriesStringContent(callOptions?: any) { // Skip the test if the model doesn't support tool calling if (!this.chatModelHasToolCalling) { console.log("Test requires tool calling. Skipping..."); @@ -526,9 +510,7 @@ export abstract class ChatModelIntegrationTests< * * @param {any | undefined} callOptions Optional call options to pass to the model. */ - async testToolMessageHistoriesListContent( - callOptions?: any - ) { + async testToolMessageHistoriesListContent(callOptions?: any) { if (!this.chatModelHasToolCalling) { console.log("Test requires tool calling. Skipping..."); return; @@ -607,9 +589,7 @@ export abstract class ChatModelIntegrationTests< * @param {any | undefined} callOptions Optional call options to pass to the model. * These options will be applied to the model at runtime. */ - async testStructuredFewShotExamples( - callOptions?: any - ) { + async testStructuredFewShotExamples(callOptions?: any) { // Skip the test if the model doesn't support tool calling if (!this.chatModelHasToolCalling) { console.log("Test requires tool calling. Skipping..."); @@ -672,9 +652,7 @@ export abstract class ChatModelIntegrationTests< * @param {any | undefined} callOptions Optional call options to pass to the model. * These options will be applied to the model at runtime. */ - async testWithStructuredOutput( - callOptions?: any - ) { + async testWithStructuredOutput(callOptions?: any) { // Skip the test if the model doesn't support structured output if (!this.chatModelHasStructuredOutput) { console.log("Test requires withStructuredOutput. Skipping..."); @@ -731,9 +709,7 @@ export abstract class ChatModelIntegrationTests< * @param {any | undefined} callOptions Optional call options to pass to the model. * These options will be applied to the model at runtime. */ - async testWithStructuredOutputIncludeRaw( - callOptions?: any - ) { + async testWithStructuredOutputIncludeRaw(callOptions?: any) { // Skip the test if the model doesn't support structured output if (!this.chatModelHasStructuredOutput) { console.log("Test requires withStructuredOutput. Skipping..."); @@ -793,9 +769,7 @@ export abstract class ChatModelIntegrationTests< * @param {any | undefined} callOptions Optional call options to pass to the model. * These options will be applied to the model at runtime. */ - async testBindToolsWithOpenAIFormattedTools( - callOptions?: any - ) { + async testBindToolsWithOpenAIFormattedTools(callOptions?: any) { // Skip the test if the model doesn't support tool calling if (!this.chatModelHasToolCalling) { console.log("Test requires tool calling. Skipping..."); @@ -860,9 +834,7 @@ export abstract class ChatModelIntegrationTests< * @param {any | undefined} callOptions Optional call options to pass to the model. * These options will be applied to the model at runtime. */ - async testBindToolsWithRunnableToolLike( - callOptions?: any - ) { + async testBindToolsWithRunnableToolLike(callOptions?: any) { // Skip the test if the model doesn't support tool calling if (!this.chatModelHasToolCalling) { console.log("Test requires tool calling. Skipping..."); @@ -928,9 +900,7 @@ export abstract class ChatModelIntegrationTests< * @param {any | undefined} callOptions Optional call options to pass to the model. * These options will be applied to the model at runtime. */ - async testCacheComplexMessageTypes( - callOptions?: any - ) { + async testCacheComplexMessageTypes(callOptions?: any) { // Create a new instance of the chat model with caching enabled const model = new this.Cls({ ...this.constructorArgs, @@ -992,9 +962,7 @@ export abstract class ChatModelIntegrationTests< * @param {any | undefined} callOptions Optional call options to pass to the model. * These options will be applied to the model at runtime. */ - async testStreamTokensWithToolCalls( - callOptions?: any - ) { + async testStreamTokensWithToolCalls(callOptions?: any) { const model = new this.Cls(this.constructorArgs); if (!model.bindTools) { throw new Error("bindTools is undefined"); @@ -1058,9 +1026,7 @@ export abstract class ChatModelIntegrationTests< * @param {any | undefined} callOptions Optional call options to pass to the model. * These options will be applied to the model at runtime. */ - async testModelCanUseToolUseAIMessage( - callOptions?: any - ) { + async testModelCanUseToolUseAIMessage(callOptions?: any) { if (!this.chatModelHasToolCalling) { console.log("Test requires tool calling. Skipping..."); return; @@ -1152,9 +1118,7 @@ export abstract class ChatModelIntegrationTests< * @param {any | undefined} callOptions Optional call options to pass to the model. * These options will be applied to the model at runtime. */ - async testModelCanUseToolUseAIMessageWithStreaming( - callOptions?: any - ) { + async testModelCanUseToolUseAIMessageWithStreaming(callOptions?: any) { if (!this.chatModelHasToolCalling) { console.log("Test requires tool calling. Skipping..."); return; @@ -1258,9 +1222,7 @@ export abstract class ChatModelIntegrationTests< * @param {any | undefined} callOptions Optional call options to pass to the model. * These options will be applied to the model at runtime. */ - async testInvokeMoreComplexTools( - callOptions?: any - ) { + async testInvokeMoreComplexTools(callOptions?: any) { // Skip the test if the model doesn't support tool calling if (!this.chatModelHasToolCalling) { console.log("Test requires tool calling. Skipping..."); @@ -1338,10 +1300,7 @@ Extraction path: {extractionPath}`, * @param {any | undefined} callOptions Optional call options to pass to the model. * @param {boolean} onlyVerifyHistory If true, only verifies the message history test. */ - async testParallelToolCalling( - callOptions?: any, - onlyVerifyHistory = false - ) { + async testParallelToolCalling(callOptions?: any, onlyVerifyHistory = false) { // Skip the test if the model doesn't support tool calling if (!this.chatModelHasToolCalling) { console.log("Test requires tool calling. Skipping..."); From 297205560dba7df6da0a568d5a3eb8979d01bddf Mon Sep 17 00:00:00 2001 From: jacoblee93 Date: Thu, 1 Aug 2024 11:40:19 -0700 Subject: [PATCH 08/16] Fix type --- langchain-core/src/language_models/chat_models.ts | 4 +++- langchain-core/src/language_models/llms.ts | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/langchain-core/src/language_models/chat_models.ts b/langchain-core/src/language_models/chat_models.ts index b60baecb3a20..6257a6ca515a 100644 --- a/langchain-core/src/language_models/chat_models.ts +++ b/langchain-core/src/language_models/chat_models.ts @@ -143,9 +143,11 @@ export abstract class BaseChatModel< // TODO: Fix the parameter order on the next minor version. OutputMessageType extends BaseMessageChunk = BaseMessageChunk > extends BaseLanguageModel { + // Backwards compatibility since signal and timeout have been moved to + // RunnableConfig declare ParsedCallOptions: Omit< CallOptions, - Exclude + Exclude >; // Only ever instantiated in main LangChain diff --git a/langchain-core/src/language_models/llms.ts b/langchain-core/src/language_models/llms.ts index f6df3677dfc1..fa2684481cdf 100644 --- a/langchain-core/src/language_models/llms.ts +++ b/langchain-core/src/language_models/llms.ts @@ -63,7 +63,7 @@ export abstract class BaseLLM< > extends BaseLanguageModel { declare ParsedCallOptions: Omit< CallOptions, - Exclude + Exclude >; // Only ever instantiated in main LangChain From 637b0bccdf32162d813d688d89833b6baa7af30e Mon Sep 17 00:00:00 2001 From: jacoblee93 Date: Thu, 1 Aug 2024 11:41:34 -0700 Subject: [PATCH 09/16] Fix type --- langchain-core/src/language_models/chat_models.ts | 5 ++--- langchain-core/src/language_models/llms.ts | 3 ++- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/langchain-core/src/language_models/chat_models.ts b/langchain-core/src/language_models/chat_models.ts index 6257a6ca515a..96a57348ffe4 100644 --- a/langchain-core/src/language_models/chat_models.ts +++ b/langchain-core/src/language_models/chat_models.ts @@ -143,11 +143,10 @@ export abstract class BaseChatModel< // TODO: Fix the parameter order on the next minor version. OutputMessageType extends BaseMessageChunk = BaseMessageChunk > extends BaseLanguageModel { - // Backwards compatibility since signal and timeout have been moved to - // RunnableConfig + // Backwards compatibility since fields have been moved to RunnableConfig declare ParsedCallOptions: Omit< CallOptions, - Exclude + Exclude >; // Only ever instantiated in main LangChain diff --git a/langchain-core/src/language_models/llms.ts b/langchain-core/src/language_models/llms.ts index fa2684481cdf..a8cc4e941960 100644 --- a/langchain-core/src/language_models/llms.ts +++ b/langchain-core/src/language_models/llms.ts @@ -61,9 +61,10 @@ interface LLMGenerateCachedParameters< export abstract class BaseLLM< CallOptions extends BaseLLMCallOptions = BaseLLMCallOptions > extends BaseLanguageModel { + // Backwards compatibility since fields have been moved to RunnableConfig declare ParsedCallOptions: Omit< CallOptions, - Exclude + Exclude >; // Only ever instantiated in main LangChain From 139b0a4d94c6d376c58ce35bda76a97daa301b04 Mon Sep 17 00:00:00 2001 From: jacoblee93 Date: Thu, 1 Aug 2024 13:38:44 -0700 Subject: [PATCH 10/16] Fix runnable map, start adding tests --- langchain-core/src/runnables/base.ts | 11 +----- .../src/runnables/tests/runnable_map.test.ts | 39 +++++++++++++++++++ .../tests/runnable_stream_events_v2.test.ts | 21 ++++++++++ langchain-core/src/utils/signal.ts | 7 ++-- 4 files changed, 65 insertions(+), 13 deletions(-) diff --git a/langchain-core/src/runnables/base.ts b/langchain-core/src/runnables/base.ts index 7732812559f0..7d1f836ab4df 100644 --- a/langchain-core/src/runnables/base.ts +++ b/langchain-core/src/runnables/base.ts @@ -2082,16 +2082,7 @@ export class RunnableMap< ); } ); - if (options?.signal) { - promises.push( - new Promise((_, reject) => { - options.signal?.addEventListener("abort", () => - reject(new Error("Aborted")) - ); - }) - ); - } - await Promise.all(promises); + await raceWithSignal(Promise.all(promises), options?.signal); } catch (e) { await runManager?.handleChainError(e); throw e; diff --git a/langchain-core/src/runnables/tests/runnable_map.test.ts b/langchain-core/src/runnables/tests/runnable_map.test.ts index ffb41ab4e527..b854738871db 100644 --- a/langchain-core/src/runnables/tests/runnable_map.test.ts +++ b/langchain-core/src/runnables/tests/runnable_map.test.ts @@ -67,6 +67,45 @@ test("Test map inference in a sequence", async () => { ); }); +test("Test invoke with signal", async () => { + const map = RunnableMap.from({ + question: new RunnablePassthrough(), + context: async () => { + await new Promise((resolve) => setTimeout(resolve, 500)); + return "SOME STUFF"; + }, + }); + const controller = new AbortController(); + await expect(async () => { + await Promise.all([ + map.invoke("testing", { + signal: controller.signal, + }), + new Promise((resolve) => { + controller.abort(); + resolve(); + }), + ]); + }).rejects.toThrowError(); +}); + +test("Test stream with signal", async () => { + const map = RunnableMap.from({ + question: new RunnablePassthrough(), + context: async () => { + await new Promise((resolve) => setTimeout(resolve, 500)); + return "SOME STUFF"; + }, + }); + const controller = new AbortController(); + await expect(async () => { + const stream = await map.stream("TESTING", { signal: controller.signal }); + for await (const _ of stream) { + controller.abort(); + } + }).rejects.toThrowError(); +}); + test("Should not allow mismatched inputs", async () => { const prompt = ChatPromptTemplate.fromTemplate( "context: {context}, question: {question}" diff --git a/langchain-core/src/runnables/tests/runnable_stream_events_v2.test.ts b/langchain-core/src/runnables/tests/runnable_stream_events_v2.test.ts index 17a39f9dde29..d1fc1ea7fc65 100644 --- a/langchain-core/src/runnables/tests/runnable_stream_events_v2.test.ts +++ b/langchain-core/src/runnables/tests/runnable_stream_events_v2.test.ts @@ -2120,3 +2120,24 @@ test("Runnable streamEvents method with text/event-stream encoding", async () => expect(decoder.decode(events[3])).toEqual("event: end\n\n"); }); + +test("Runnable streamEvents method should respect passed signal", async () => { + const r = RunnableLambda.from(reverse); + + const chain = r + .withConfig({ runName: "1" }) + .pipe(r.withConfig({ runName: "2" })) + .pipe(r.withConfig({ runName: "3" })); + + const controller = new AbortController(); + const eventStream = await chain.streamEvents("hello", { + version: "v2", + signal: controller.signal, + }); + await expect(async () => { + for await (const _ of eventStream) { + // Abort after the first chunk + controller.abort(); + } + }).rejects.toThrowError(); +}); diff --git a/langchain-core/src/utils/signal.ts b/langchain-core/src/utils/signal.ts index b2f339911033..535faaa8502f 100644 --- a/langchain-core/src/utils/signal.ts +++ b/langchain-core/src/utils/signal.ts @@ -5,12 +5,13 @@ export async function raceWithSignal( if (signal === undefined) { return promise; } - if (signal.aborted) { - throw new Error("AbortError"); - } return Promise.race([ promise, new Promise((_, reject) => { + // Must be inside of the promise to avoid a race condition + if (signal.aborted) { + return reject(new Error("Aborted")); + } signal.addEventListener("abort", () => reject(new Error("Aborted"))); }), ]); From 64853b54d2ec49db12601888f0a6c20b90a92ee6 Mon Sep 17 00:00:00 2001 From: jacoblee93 Date: Thu, 1 Aug 2024 13:42:50 -0700 Subject: [PATCH 11/16] Adds test --- .../src/runnables/tests/runnable_map.test.ts | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/langchain-core/src/runnables/tests/runnable_map.test.ts b/langchain-core/src/runnables/tests/runnable_map.test.ts index b854738871db..77f662751d08 100644 --- a/langchain-core/src/runnables/tests/runnable_map.test.ts +++ b/langchain-core/src/runnables/tests/runnable_map.test.ts @@ -75,8 +75,8 @@ test("Test invoke with signal", async () => { return "SOME STUFF"; }, }); - const controller = new AbortController(); await expect(async () => { + const controller = new AbortController(); await Promise.all([ map.invoke("testing", { signal: controller.signal, @@ -87,6 +87,20 @@ test("Test invoke with signal", async () => { }), ]); }).rejects.toThrowError(); + await expect(async () => { + const controller = new AbortController(); + await Promise.all([ + map.invoke("testing", { + signal: controller.signal, + }), + new Promise((resolve) => { + setTimeout(() => { + controller.abort(); + resolve(); + }, 250); + }), + ]); + }).rejects.toThrowError(); }); test("Test stream with signal", async () => { From 1c6c2b66d6b0731f5fb07ca16e6c20190ecdec6d Mon Sep 17 00:00:00 2001 From: jacoblee93 Date: Thu, 1 Aug 2024 14:03:09 -0700 Subject: [PATCH 12/16] More robust fix --- langchain-core/src/utils/signal.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/langchain-core/src/utils/signal.ts b/langchain-core/src/utils/signal.ts index 535faaa8502f..b0a7871470e6 100644 --- a/langchain-core/src/utils/signal.ts +++ b/langchain-core/src/utils/signal.ts @@ -8,11 +8,11 @@ export async function raceWithSignal( return Promise.race([ promise, new Promise((_, reject) => { - // Must be inside of the promise to avoid a race condition + signal.addEventListener("abort", () => reject(new Error("Aborted"))); + // Must be here inside the promise to avoid a race condition if (signal.aborted) { return reject(new Error("Aborted")); } - signal.addEventListener("abort", () => reject(new Error("Aborted"))); }), ]); } From de8df005dce1348d78a136ea048acfa0782acf21 Mon Sep 17 00:00:00 2001 From: jacoblee93 Date: Thu, 1 Aug 2024 14:24:16 -0700 Subject: [PATCH 13/16] Ignore thrown errors after aborting a signal --- langchain-core/src/utils/signal.ts | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/langchain-core/src/utils/signal.ts b/langchain-core/src/utils/signal.ts index b0a7871470e6..7ccb554429cf 100644 --- a/langchain-core/src/utils/signal.ts +++ b/langchain-core/src/utils/signal.ts @@ -6,12 +6,20 @@ export async function raceWithSignal( return promise; } return Promise.race([ - promise, + promise.catch((err) => { + if (!signal?.aborted) { + throw err; + } else { + return undefined as T; + } + }), new Promise((_, reject) => { - signal.addEventListener("abort", () => reject(new Error("Aborted"))); + signal.addEventListener("abort", () => { + reject(new Error("Aborted")); + }); // Must be here inside the promise to avoid a race condition if (signal.aborted) { - return reject(new Error("Aborted")); + reject(new Error("Aborted")); } }), ]); From a44c38a7c748a070824978a3780413fdffb822a3 Mon Sep 17 00:00:00 2001 From: jacoblee93 Date: Fri, 2 Aug 2024 16:01:11 -0700 Subject: [PATCH 14/16] Adds test cases, fix streaming for generators --- .../src/runnables/tests/signal.test.ts | 153 ++++++++++++++++++ langchain-core/src/utils/stream.ts | 3 +- langchain-core/src/utils/testing/index.ts | 15 +- 3 files changed, 168 insertions(+), 3 deletions(-) create mode 100644 langchain-core/src/runnables/tests/signal.test.ts diff --git a/langchain-core/src/runnables/tests/signal.test.ts b/langchain-core/src/runnables/tests/signal.test.ts new file mode 100644 index 000000000000..565fc3267b18 --- /dev/null +++ b/langchain-core/src/runnables/tests/signal.test.ts @@ -0,0 +1,153 @@ +/* eslint-disable no-promise-executor-return */ +/* eslint-disable @typescript-eslint/no-explicit-any */ + +import { + Runnable, + RunnableLambda, + RunnableMap, + RunnablePassthrough, + RunnableSequence, + RunnableWithMessageHistory, +} from "../index.js"; +import { + FakeChatMessageHistory, + FakeListChatModel, +} from "../../utils/testing/index.js"; + +const chatModel = new FakeListChatModel({ responses: ["hey"], sleep: 500 }); + +const TEST_CASES = { + map: { + runnable: RunnableMap.from({ + question: new RunnablePassthrough(), + context: async () => { + await new Promise((resolve) => setTimeout(resolve, 500)); + return "SOME STUFF"; + }, + }), + input: "testing", + }, + binding: { + runnable: RunnableLambda.from( + () => new Promise((resolve) => setTimeout(resolve, 500)) + ), + input: "testing", + }, + fallbacks: { + runnable: chatModel + .bind({ thrownErrorString: "expected" }) + .withFallbacks({ fallbacks: [chatModel] }), + input: "testing", + skipStream: true, + }, + sequence: { + runnable: RunnableSequence.from([ + RunnablePassthrough.assign({ + test: () => chatModel, + }), + () => {}, + ]), + input: { question: "testing" }, + }, + lambda: { + runnable: RunnableLambda.from( + () => new Promise((resolve) => setTimeout(resolve, 500)) + ), + input: {}, + }, + history: { + runnable: new RunnableWithMessageHistory({ + runnable: chatModel, + config: {}, + getMessageHistory: () => new FakeChatMessageHistory(), + }), + input: "testing", + }, +}; + +describe.each(Object.keys(TEST_CASES))("Test runnable %s", (name) => { + const { + runnable, + input, + skipStream, + }: { runnable: Runnable; input: any; skipStream?: boolean } = + TEST_CASES[name as keyof typeof TEST_CASES]; + test("Test invoke with signal", async () => { + await expect(async () => { + const controller = new AbortController(); + await Promise.all([ + runnable.invoke(input, { + signal: controller.signal, + }), + new Promise((resolve) => { + controller.abort(); + resolve(); + }), + ]); + }).rejects.toThrowError(); + }); + + test("Test invoke with signal with a delay", async () => { + await expect(async () => { + const controller = new AbortController(); + await Promise.all([ + runnable.invoke(input, { + signal: controller.signal, + }), + new Promise((resolve) => { + setTimeout(() => { + controller.abort(); + resolve(); + }, 250); + }), + ]); + }).rejects.toThrowError(); + }); + + test("Test stream with signal", async () => { + if (skipStream) { + return; + } + const controller = new AbortController(); + await expect(async () => { + const stream = await runnable.stream(input, { + signal: controller.signal, + }); + for await (const _ of stream) { + controller.abort(); + } + }).rejects.toThrowError(); + }); + + test("Test batch with signal", async () => { + await expect(async () => { + const controller = new AbortController(); + await Promise.all([ + runnable.batch([input, input], { + signal: controller.signal, + }), + new Promise((resolve) => { + controller.abort(); + resolve(); + }), + ]); + }).rejects.toThrowError(); + }); + + test("Test batch with signal with a delay", async () => { + await expect(async () => { + const controller = new AbortController(); + await Promise.all([ + runnable.batch([input, input], { + signal: controller.signal, + }), + new Promise((resolve) => { + setTimeout(() => { + controller.abort(); + resolve(); + }, 250); + }), + ]); + }).rejects.toThrowError(); + }); +}); diff --git a/langchain-core/src/utils/stream.ts b/langchain-core/src/utils/stream.ts index f40e997d23fb..91a9810e2d25 100644 --- a/langchain-core/src/utils/stream.ts +++ b/langchain-core/src/utils/stream.ts @@ -201,7 +201,8 @@ export class AsyncGeneratorWithSetup< }) { this.generator = params.generator; this.config = params.config; - this.signal = params.signal; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + this.signal = params.signal ?? (this.config as any)?.signal; // setup is a promise that resolves only after the first iterator value // is available. this is useful when setup of several piped generators // needs to happen in logical order, ie. in the order in which input to diff --git a/langchain-core/src/utils/testing/index.ts b/langchain-core/src/utils/testing/index.ts index 65d197f6c23e..f14629794293 100644 --- a/langchain-core/src/utils/testing/index.ts +++ b/langchain-core/src/utils/testing/index.ts @@ -15,6 +15,7 @@ import { import { Document } from "../../documents/document.js"; import { BaseChatModel, + BaseChatModelCallOptions, BaseChatModelParams, } from "../../language_models/chat_models.js"; import { BaseLLMParams, LLM } from "../../language_models/llms.js"; @@ -324,6 +325,10 @@ export interface FakeChatInput extends BaseChatModelParams { emitCustomEvent?: boolean; } +export interface FakeListChatModelCallOptions extends BaseChatModelCallOptions { + thrownErrorString?: string; +} + /** * A fake Chat Model that returns a predefined list of responses. It can be used * for testing purposes. @@ -344,7 +349,7 @@ export interface FakeChatInput extends BaseChatModelParams { * console.log({ secondResponse }); * ``` */ -export class FakeListChatModel extends BaseChatModel { +export class FakeListChatModel extends BaseChatModel { static lc_name() { return "FakeListChatModel"; } @@ -378,6 +383,9 @@ export class FakeListChatModel extends BaseChatModel { runManager?: CallbackManagerForLLMRun ): Promise { await this._sleepIfRequested(); + if (options?.thrownErrorString) { + throw new Error(options.thrownErrorString); + } if (this.emitCustomEvent) { await runManager?.handleCustomEvent("some_test_event", { someval: true, @@ -408,7 +416,7 @@ export class FakeListChatModel extends BaseChatModel { async *_streamResponseChunks( _messages: BaseMessage[], - _options: this["ParsedCallOptions"], + options: this["ParsedCallOptions"], runManager?: CallbackManagerForLLMRun ): AsyncGenerator { const response = this._currentResponse(); @@ -421,6 +429,9 @@ export class FakeListChatModel extends BaseChatModel { for await (const text of response) { await this._sleepIfRequested(); + if (options?.thrownErrorString) { + throw new Error(options.thrownErrorString); + } const chunk = this._createResponseChunk(text); yield chunk; void runManager?.handleLLMNewToken(text); From 65adf442b815389898dc65bbc7f8d404bcbb922c Mon Sep 17 00:00:00 2001 From: jacoblee93 Date: Fri, 2 Aug 2024 16:20:12 -0700 Subject: [PATCH 15/16] Remove redundant test --- .../src/runnables/tests/runnable_map.test.ts | 53 ------------------- 1 file changed, 53 deletions(-) diff --git a/langchain-core/src/runnables/tests/runnable_map.test.ts b/langchain-core/src/runnables/tests/runnable_map.test.ts index 77f662751d08..ffb41ab4e527 100644 --- a/langchain-core/src/runnables/tests/runnable_map.test.ts +++ b/langchain-core/src/runnables/tests/runnable_map.test.ts @@ -67,59 +67,6 @@ test("Test map inference in a sequence", async () => { ); }); -test("Test invoke with signal", async () => { - const map = RunnableMap.from({ - question: new RunnablePassthrough(), - context: async () => { - await new Promise((resolve) => setTimeout(resolve, 500)); - return "SOME STUFF"; - }, - }); - await expect(async () => { - const controller = new AbortController(); - await Promise.all([ - map.invoke("testing", { - signal: controller.signal, - }), - new Promise((resolve) => { - controller.abort(); - resolve(); - }), - ]); - }).rejects.toThrowError(); - await expect(async () => { - const controller = new AbortController(); - await Promise.all([ - map.invoke("testing", { - signal: controller.signal, - }), - new Promise((resolve) => { - setTimeout(() => { - controller.abort(); - resolve(); - }, 250); - }), - ]); - }).rejects.toThrowError(); -}); - -test("Test stream with signal", async () => { - const map = RunnableMap.from({ - question: new RunnablePassthrough(), - context: async () => { - await new Promise((resolve) => setTimeout(resolve, 500)); - return "SOME STUFF"; - }, - }); - const controller = new AbortController(); - await expect(async () => { - const stream = await map.stream("TESTING", { signal: controller.signal }); - for await (const _ of stream) { - controller.abort(); - } - }).rejects.toThrowError(); -}); - test("Should not allow mismatched inputs", async () => { const prompt = ChatPromptTemplate.fromTemplate( "context: {context}, question: {question}" From fbf35f9b4cfa9f18b30ad1aa1a858ad1688d56f5 Mon Sep 17 00:00:00 2001 From: jacoblee93 Date: Fri, 2 Aug 2024 16:24:18 -0700 Subject: [PATCH 16/16] Fix --- langchain-core/src/runnables/tests/signal.test.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/langchain-core/src/runnables/tests/signal.test.ts b/langchain-core/src/runnables/tests/signal.test.ts index 565fc3267b18..7413ea3794cf 100644 --- a/langchain-core/src/runnables/tests/signal.test.ts +++ b/langchain-core/src/runnables/tests/signal.test.ts @@ -1,6 +1,7 @@ /* eslint-disable no-promise-executor-return */ /* eslint-disable @typescript-eslint/no-explicit-any */ +import { test, describe, expect } from "@jest/globals"; import { Runnable, RunnableLambda,