diff --git a/langchain-core/src/callbacks/manager.ts b/langchain-core/src/callbacks/manager.ts index 9a6cbe9405cf..222e543cb09f 100644 --- a/langchain-core/src/callbacks/manager.ts +++ b/langchain-core/src/callbacks/manager.ts @@ -154,6 +154,42 @@ export class BaseRunManager { ) ); } + + async handleCustomEvent( + eventName: string, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + data: any, + _runId?: string, + _tags?: string[], + // eslint-disable-next-line @typescript-eslint/no-explicit-any + _metadata?: Record + ): Promise { + await Promise.all( + this.handlers.map((handler) => + consumeCallback(async () => { + try { + await handler.handleCustomEvent?.( + eventName, + data, + this.runId, + this.tags, + this.metadata + ); + } catch (err) { + const logFunction = handler.raiseError + ? console.error + : console.warn; + logFunction( + `Error in handler ${handler.constructor.name}, handleCustomEvent: ${err}` + ); + if (handler.raiseError) { + throw err; + } + } + }, handler.awaitHandlers) + ) + ); + } } /** diff --git a/langchain-core/src/language_models/tests/chat_models.test.ts b/langchain-core/src/language_models/tests/chat_models.test.ts index 6ddb97f3d067..5cda76ea227c 100644 --- a/langchain-core/src/language_models/tests/chat_models.test.ts +++ b/langchain-core/src/language_models/tests/chat_models.test.ts @@ -228,3 +228,40 @@ test("Test ChatModel can cache complex messages", async () => { const cachedMsg = value[0].message as AIMessage; expect(cachedMsg.content).toEqual(JSON.stringify(contentToCache, null, 2)); }); + +test("Test ChatModel can emit a custom event", async () => { + const model = new FakeListChatModel({ + responses: ["hi"], + emitCustomEvent: true, + }); + let customEvent; + const response = await model.invoke([["human", "Hello there!"]], { + callbacks: [ + { + handleCustomEvent(_, data) { + customEvent = data; + }, + }, + ], + }); + await new Promise((resolve) => setTimeout(resolve, 100)); + expect(response.content).toEqual("hi"); + expect(customEvent).toBeDefined(); +}); + +test.only("Test ChatModel can stream back a custom event", async () => { + const model = new FakeListChatModel({ + responses: ["hi"], + emitCustomEvent: true, + }); + let customEvent; + const eventStream = await model.streamEvents([["human", "Hello there!"]], { + version: "v2", + }); + for await (const event of eventStream) { + if (event.event === "on_custom_event") { + customEvent = event; + } + } + expect(customEvent).toBeDefined(); +}); diff --git a/langchain-core/src/utils/testing/index.ts b/langchain-core/src/utils/testing/index.ts index 73e79f892fc1..685fae8d3749 100644 --- a/langchain-core/src/utils/testing/index.ts +++ b/langchain-core/src/utils/testing/index.ts @@ -320,6 +320,8 @@ export interface FakeChatInput extends BaseChatModelParams { /** Time to sleep in milliseconds between responses */ sleep?: number; + + emitCustomEvent?: boolean; } /** @@ -353,10 +355,13 @@ export class FakeListChatModel extends BaseChatModel { sleep?: number; - constructor({ responses, sleep }: FakeChatInput) { + emitCustomEvent = false; + + constructor({ responses, sleep, emitCustomEvent }: FakeChatInput) { super({}); this.responses = responses; this.sleep = sleep; + this.emitCustomEvent = emitCustomEvent ?? this.emitCustomEvent; } _combineLLMOutput() { @@ -369,9 +374,15 @@ export class FakeListChatModel extends BaseChatModel { async _generate( _messages: BaseMessage[], - options?: this["ParsedCallOptions"] + options?: this["ParsedCallOptions"], + runManager?: CallbackManagerForLLMRun ): Promise { await this._sleepIfRequested(); + if (this.emitCustomEvent) { + await runManager?.handleCustomEvent("some_test_event", { + someval: true, + }); + } if (options?.stop?.length) { return { @@ -402,6 +413,11 @@ export class FakeListChatModel extends BaseChatModel { ): AsyncGenerator { const response = this._currentResponse(); this._incrementResponse(); + if (this.emitCustomEvent) { + await runManager?.handleCustomEvent("some_test_event", { + someval: true, + }); + } for await (const text of response) { await this._sleepIfRequested();