Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: Add signal/timeout options to RunnableConfig #6305

Merged
merged 20 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions langchain-core/src/language_models/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,18 +207,6 @@ export interface BaseLanguageModelCallOptions extends RunnableConfig {
* If not provided, the default stop tokens for the model will be used.
*/
stop?: string[];

/**
* Timeout for this call in milliseconds.
*/
timeout?: number;

/**
* Abort signal for this call.
* If provided, the call will be aborted when the signal is aborted.
* @see https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal
*/
signal?: AbortSignal;
}

export interface FunctionDefinition {
Expand Down
18 changes: 9 additions & 9 deletions langchain-core/src/language_models/chat_models.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ export abstract class BaseChatModel<
> extends BaseLanguageModel<OutputMessageType, CallOptions> {
declare ParsedCallOptions: Omit<
CallOptions,
keyof RunnableConfig & "timeout"
Exclude<keyof RunnableConfig, "signal">
>;

// Only ever instantiated in main LangChain
Expand All @@ -159,14 +159,13 @@ export abstract class BaseChatModel<
...llmOutputs: LLMResult["llmOutput"][]
): LLMResult["llmOutput"];

protected _separateRunnableConfigFromCallOptions(
protected _separateRunnableConfigFromCallOptionsCompat(
options?: Partial<CallOptions>
): [RunnableConfig, this["ParsedCallOptions"]] {
// For backwards compat, keep `signal` in both runnableConfig and callOptions
const [runnableConfig, callOptions] =
super._separateRunnableConfigFromCallOptions(options);
if (callOptions?.timeout && !callOptions.signal) {
callOptions.signal = AbortSignal.timeout(callOptions.timeout);
}
(callOptions as this["ParsedCallOptions"]).signal = runnableConfig.signal;
return [runnableConfig, callOptions as this["ParsedCallOptions"]];
}

Expand Down Expand Up @@ -232,7 +231,7 @@ export abstract class BaseChatModel<
const prompt = BaseChatModel._convertInputToPromptValue(input);
const messages = prompt.toChatMessages();
const [runnableConfig, callOptions] =
this._separateRunnableConfigFromCallOptions(options);
this._separateRunnableConfigFromCallOptionsCompat(options);

const inheritableMetadata = {
...runnableConfig.metadata,
Expand Down Expand Up @@ -578,16 +577,17 @@ export abstract class BaseChatModel<
);

const [runnableConfig, callOptions] =
this._separateRunnableConfigFromCallOptions(parsedOptions);
this._separateRunnableConfigFromCallOptionsCompat(parsedOptions);
runnableConfig.callbacks = runnableConfig.callbacks ?? callbacks;

if (!this.cache) {
return this._generateUncached(baseMessages, callOptions, runnableConfig);
}

const { cache } = this;
const llmStringKey =
this._getSerializedCacheKeyParametersForCall(callOptions);
const llmStringKey = this._getSerializedCacheKeyParametersForCall(
callOptions as CallOptions
);

const { generations, missingPromptIndices } = await this._generateCached({
messages: baseMessages,
Expand Down
18 changes: 9 additions & 9 deletions langchain-core/src/language_models/llms.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ export abstract class BaseLLM<
> extends BaseLanguageModel<string, CallOptions> {
declare ParsedCallOptions: Omit<
CallOptions,
keyof RunnableConfig & "timeout"
Exclude<keyof RunnableConfig, "signal">
>;

// Only ever instantiated in main LangChain
Expand Down Expand Up @@ -103,14 +103,13 @@ export abstract class BaseLLM<
throw new Error("Not implemented.");
}

protected _separateRunnableConfigFromCallOptions(
protected _separateRunnableConfigFromCallOptionsCompat(
options?: Partial<CallOptions>
): [RunnableConfig, this["ParsedCallOptions"]] {
// For backwards compat, keep `signal` in both runnableConfig and callOptions
const [runnableConfig, callOptions] =
super._separateRunnableConfigFromCallOptions(options);
if (callOptions?.timeout && !callOptions.signal) {
callOptions.signal = AbortSignal.timeout(callOptions.timeout);
}
(callOptions as this["ParsedCallOptions"]).signal = runnableConfig.signal;
return [runnableConfig, callOptions as this["ParsedCallOptions"]];
}

Expand All @@ -126,7 +125,7 @@ export abstract class BaseLLM<
} else {
const prompt = BaseLLM._convertInputToPromptValue(input);
const [runnableConfig, callOptions] =
this._separateRunnableConfigFromCallOptions(options);
this._separateRunnableConfigFromCallOptionsCompat(options);
const callbackManager_ = await CallbackManager.configure(
runnableConfig.callbacks,
this.callbacks,
Expand Down Expand Up @@ -461,16 +460,17 @@ export abstract class BaseLLM<
}

const [runnableConfig, callOptions] =
this._separateRunnableConfigFromCallOptions(parsedOptions);
this._separateRunnableConfigFromCallOptionsCompat(parsedOptions);
runnableConfig.callbacks = runnableConfig.callbacks ?? callbacks;

if (!this.cache) {
return this._generateUncached(prompts, callOptions, runnableConfig);
}

const { cache } = this;
const llmStringKey =
this._getSerializedCacheKeyParametersForCall(callOptions);
const llmStringKey = this._getSerializedCacheKeyParametersForCall(
callOptions as CallOptions
);
const { generations, missingPromptIndices } = await this._generateCached({
prompts,
cache,
Expand Down
60 changes: 49 additions & 11 deletions langchain-core/src/runnables/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import {
pipeGeneratorWithSetup,
AsyncGeneratorWithSetup,
} from "../utils/stream.js";
import { raceWithSignal } from "../utils/signal.js";
import {
DEFAULT_RECURSION_LIMIT,
RunnableConfig,
Expand Down Expand Up @@ -339,6 +340,8 @@ export abstract class Runnable<
recursionLimit: options.recursionLimit,
maxConcurrency: options.maxConcurrency,
runId: options.runId,
timeout: options.timeout,
signal: options.signal,
});
}
const callOptions = { ...(options as Partial<CallOptions>) };
Expand All @@ -350,6 +353,8 @@ export abstract class Runnable<
delete callOptions.recursionLimit;
delete callOptions.maxConcurrency;
delete callOptions.runId;
delete callOptions.timeout;
delete callOptions.signal;
return [runnableConfig, callOptions];
}

Expand Down Expand Up @@ -378,7 +383,8 @@ export abstract class Runnable<
delete config.runId;
let output;
try {
output = await func.call(this, input, config, runManager);
const promise = func.call(this, input, config, runManager);
output = await raceWithSignal(promise, options?.signal);
} catch (e) {
await runManager?.handleChainError(e);
throw e;
Expand Down Expand Up @@ -430,13 +436,14 @@ export abstract class Runnable<
);
let outputs: (RunOutput | Error)[];
try {
outputs = await func.call(
const promise = func.call(
this,
inputs,
optionsList,
runManagers,
batchOptions
);
outputs = await raceWithSignal(promise, optionsList?.[0]?.signal);
} catch (e) {
await Promise.all(
runManagers.map((runManager) => runManager?.handleChainError(e))
Expand Down Expand Up @@ -509,6 +516,7 @@ export abstract class Runnable<
undefined,
config.runName ?? this.getName()
),
options?.signal,
config
);
delete config.runId;
Expand Down Expand Up @@ -1750,14 +1758,18 @@ export class RunnableSequence<
const initialSteps = [this.first, ...this.middle];
for (let i = 0; i < initialSteps.length; i += 1) {
const step = initialSteps[i];
nextStepInput = await step.invoke(
const promise = step.invoke(
nextStepInput,
patchConfig(config, {
callbacks: runManager?.getChild(`seq:step:${i + 1}`),
})
);
nextStepInput = await raceWithSignal(promise, options?.signal);
}
// TypeScript can't detect that the last output of the sequence returns RunOutput, so call it out of the loop here
if (options?.signal?.aborted) {
throw new Error("Aborted");
}
finalOutput = await this.last.invoke(
nextStepInput,
patchConfig(config, {
Expand Down Expand Up @@ -1819,14 +1831,15 @@ export class RunnableSequence<
try {
for (let i = 0; i < this.steps.length; i += 1) {
const step = this.steps[i];
nextStepInputs = await step.batch(
const promise = step.batch(
nextStepInputs,
runManagers.map((runManager, j) => {
const childRunManager = runManager?.getChild(`seq:step:${i + 1}`);
return patchConfig(configList[j], { callbacks: childRunManager });
}),
batchOptions
);
nextStepInputs = await raceWithSignal(promise, configList[0]?.signal);
}
} catch (e) {
await Promise.all(
Expand Down Expand Up @@ -1880,6 +1893,7 @@ export class RunnableSequence<
);
}
for await (const chunk of finalGenerator) {
options?.signal?.throwIfAborted();
yield chunk;
if (concatSupported) {
if (finalOutput === undefined) {
Expand Down Expand Up @@ -2058,16 +2072,26 @@ export class RunnableMap<
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const output: Record<string, any> = {};
try {
await Promise.all(
Object.entries(this.steps).map(async ([key, runnable]) => {
const promises = Object.entries(this.steps).map(
async ([key, runnable]) => {
output[key] = await runnable.invoke(
input,
patchConfig(config, {
callbacks: runManager?.getChild(`map:key:${key}`),
})
);
})
}
);
if (options?.signal) {
promises.push(
new Promise<never>((_, reject) => {
options.signal?.addEventListener("abort", () =>
reject(new Error("Aborted"))
);
})
);
}
await Promise.all(promises);
} catch (e) {
await runManager?.handleChainError(e);
throw e;
Expand Down Expand Up @@ -2101,7 +2125,11 @@ export class RunnableMap<
// starting new iterations as needed,
// until all iterators are done
while (tasks.size) {
const { key, result, gen } = await Promise.race(tasks.values());
const promise = Promise.race(tasks.values());
const { key, result, gen } = await raceWithSignal(
promise,
options?.signal
);
tasks.delete(key);
if (!result.done) {
yield { [key]: result.value } as unknown as RunOutput;
Expand Down Expand Up @@ -2172,28 +2200,32 @@ export class RunnableTraceable<RunInput, RunOutput> extends Runnable<
async invoke(input: RunInput, options?: Partial<RunnableConfig>) {
const [config] = this._getOptionsList(options ?? {}, 1);
const callbacks = await getCallbackManagerForConfig(config);

return (await this.func(
const promise = this.func(
patchConfig(config, { callbacks }),
input
)) as RunOutput;
) as Promise<RunOutput>;

return raceWithSignal(promise, config?.signal);
}

async *_streamIterator(
input: RunInput,
options?: Partial<RunnableConfig>
): AsyncGenerator<RunOutput> {
const [config] = this._getOptionsList(options ?? {}, 1);
const result = await this.invoke(input, options);

if (isAsyncIterable(result)) {
for await (const item of result) {
config?.signal?.throwIfAborted();
yield item as RunOutput;
}
return;
}

if (isIterator(result)) {
while (true) {
config?.signal?.throwIfAborted();
const state: IteratorResult<unknown> = result.next();
if (state.done) break;
yield state.value as RunOutput;
Expand Down Expand Up @@ -2320,6 +2352,7 @@ export class RunnableLambda<RunInput, RunOutput> extends Runnable<
childConfig,
output
)) {
config?.signal?.throwIfAborted();
if (finalOutput === undefined) {
finalOutput = chunk as RunOutput;
} else {
Expand All @@ -2339,6 +2372,7 @@ export class RunnableLambda<RunInput, RunOutput> extends Runnable<
childConfig,
output
)) {
config?.signal?.throwIfAborted();
if (finalOutput === undefined) {
finalOutput = chunk as RunOutput;
} else {
Expand Down Expand Up @@ -2423,10 +2457,12 @@ export class RunnableLambda<RunInput, RunOutput> extends Runnable<
childConfig,
output
)) {
config?.signal?.throwIfAborted();
yield chunk as RunOutput;
}
} else if (isIterableIterator(output)) {
for (const chunk of consumeIteratorInContext(childConfig, output)) {
config?.signal?.throwIfAborted();
yield chunk as RunOutput;
}
} else {
Expand Down Expand Up @@ -2517,6 +2553,7 @@ export class RunnableWithFallbacks<RunInput, RunOutput> extends Runnable<
);
let firstError;
for (const runnable of this.runnables()) {
config?.signal?.throwIfAborted();
try {
const output = await runnable.invoke(
input,
Expand Down Expand Up @@ -2586,6 +2623,7 @@ export class RunnableWithFallbacks<RunInput, RunOutput> extends Runnable<
// eslint-disable-next-line @typescript-eslint/no-explicit-any
let firstError: any;
for (const runnable of this.runnables()) {
configList[0].signal?.throwIfAborted();
try {
const outputs = await runnable.batch(
inputs,
Expand Down
Loading
Loading