Skip to content

Commit

Permalink
Merge branch 'main' into feature/add-boolean-attribute-support
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-w-nick authored Dec 20, 2024
2 parents 21f1504 + da80dce commit dd2863e
Show file tree
Hide file tree
Showing 26 changed files with 552 additions and 81 deletions.
2 changes: 1 addition & 1 deletion langchain-core/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@langchain/core",
"version": "0.3.25",
"version": "0.3.26",
"description": "Core LangChain.js abstractions and schemas",
"type": "module",
"engines": {
Expand Down
49 changes: 28 additions & 21 deletions langchain-core/src/runnables/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2840,7 +2840,7 @@ export class RunnableWithFallbacks<RunInput, RunOutput> extends Runnable<
options?: Partial<RunnableConfig>
): Promise<RunOutput> {
const config = ensureConfig(options);
const callbackManager_ = await getCallbackManagerForConfig(options);
const callbackManager_ = await getCallbackManagerForConfig(config);
const { runId, ...otherConfigFields } = config;
const runManager = await callbackManager_?.handleChainStart(
this.toJSON(),
Expand All @@ -2851,35 +2851,41 @@ export class RunnableWithFallbacks<RunInput, RunOutput> extends Runnable<
undefined,
otherConfigFields?.runName
);
let firstError;
for (const runnable of this.runnables()) {
config?.signal?.throwIfAborted();
try {
const output = await runnable.invoke(
input,
patchConfig(otherConfigFields, { callbacks: runManager?.getChild() })
);
await runManager?.handleChainEnd(_coerceToDict(output, "output"));
return output;
} catch (e) {
const childConfig = patchConfig(otherConfigFields, {
callbacks: runManager?.getChild(),
});
const res = await AsyncLocalStorageProviderSingleton.runWithConfig(
childConfig,
async () => {
let firstError;
for (const runnable of this.runnables()) {
config?.signal?.throwIfAborted();
try {
const output = await runnable.invoke(input, childConfig);
await runManager?.handleChainEnd(_coerceToDict(output, "output"));
return output;
} catch (e) {
if (firstError === undefined) {
firstError = e;
}
}
}
if (firstError === undefined) {
firstError = e;
throw new Error("No error stored at end of fallback.");
}
await runManager?.handleChainError(firstError);
throw firstError;
}
}
if (firstError === undefined) {
throw new Error("No error stored at end of fallback.");
}
await runManager?.handleChainError(firstError);
throw firstError;
);
return res;
}

async *_streamIterator(
input: RunInput,
options?: Partial<RunnableConfig> | undefined
): AsyncGenerator<RunOutput> {
const config = ensureConfig(options);
const callbackManager_ = await getCallbackManagerForConfig(options);
const callbackManager_ = await getCallbackManagerForConfig(config);
const { runId, ...otherConfigFields } = config;
const runManager = await callbackManager_?.handleChainStart(
this.toJSON(),
Expand All @@ -2898,7 +2904,8 @@ export class RunnableWithFallbacks<RunInput, RunOutput> extends Runnable<
callbacks: runManager?.getChild(),
});
try {
stream = await runnable.stream(input, childConfig);
const originalStream = await runnable.stream(input, childConfig);
stream = consumeAsyncIterableInContext(childConfig, originalStream);
break;
} catch (e) {
if (firstError === undefined) {
Expand Down
37 changes: 37 additions & 0 deletions langchain-core/src/runnables/tests/runnable_with_fallbacks.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
/* eslint-disable no-promise-executor-return */
/* eslint-disable @typescript-eslint/no-explicit-any */
/* eslint-disable no-process-env */
import { test, expect } from "@jest/globals";
import { AsyncLocalStorage } from "node:async_hooks";
import { FakeLLM, FakeStreamingLLM } from "../../utils/testing/index.js";
import { RunnableLambda } from "../base.js";
import { AsyncLocalStorageProviderSingleton } from "../../singletons/index.js";

test("RunnableWithFallbacks", async () => {
const llm = new FakeLLM({
Expand Down Expand Up @@ -55,3 +59,36 @@ test("RunnableWithFallbacks stream", async () => {
expect(chunks.length).toBeGreaterThan(1);
expect(chunks.join("")).toEqual("What up");
});

test("RunnableWithFallbacks stream events with local storage and callbacks added via env vars", async () => {
process.env.LANGCHAIN_VERBOSE = "true";
AsyncLocalStorageProviderSingleton.initializeGlobalInstance(
new AsyncLocalStorage()
);
const llm = new FakeStreamingLLM({
thrownErrorString: "Bad error!",
});
const llmWithFallbacks = llm.withFallbacks({
fallbacks: [new FakeStreamingLLM({})],
});
const runnable = RunnableLambda.from(async (input: any) => {
const res = await llmWithFallbacks.invoke(input);
const stream = await llmWithFallbacks.stream(input);
for await (const _ of stream) {
void _;
}
return res;
});
const stream = await runnable.streamEvents("hi", {
version: "v2",
});
const chunks = [];
for await (const chunk of stream) {
if (chunk.event === "on_llm_stream") {
chunks.push(chunk);
}
}
expect(chunks.length).toBeGreaterThan(1);
console.log(JSON.stringify(chunks, null, 2));
expect(chunks.map((chunk) => chunk.data.chunk.text).join("")).toEqual("hihi");
});
2 changes: 1 addition & 1 deletion libs/langchain-azure-cosmosdb/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@langchain/azure-cosmosdb",
"version": "0.2.5",
"version": "0.2.6",
"description": "Azure CosmosDB integration for LangChain.js",
"type": "module",
"engines": {
Expand Down
178 changes: 178 additions & 0 deletions libs/langchain-azure-cosmosdb/src/caches/caches_mongodb.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
import {
BaseCache,
deserializeStoredGeneration,
getCacheKey,
serializeGeneration,
} from "@langchain/core/caches";
import { Generation } from "@langchain/core/outputs";
import { Document } from "@langchain/core/documents";
import { EmbeddingsInterface } from "@langchain/core/embeddings";
import { getEnvironmentVariable } from "@langchain/core/utils/env";
import { MongoClient } from "mongodb";
import {
AzureCosmosDBMongoDBConfig,
AzureCosmosDBMongoDBVectorStore,
AzureCosmosDBMongoDBSimilarityType,
} from "../azure_cosmosdb_mongodb.js";

/**
* Represents a Semantic Cache that uses CosmosDB MongoDB backend as the underlying
* storage system.
*
* @example
* ```typescript
* const embeddings = new OpenAIEmbeddings();
* const cache = new AzureCosmosDBMongoDBSemanticCache(embeddings, {
* client?: MongoClient
* });
* const model = new ChatOpenAI({cache});
*
* // Invoke the model to perform an action
* const response = await model.invoke("Do something random!");
* console.log(response);
* ```
*/
export class AzureCosmosDBMongoDBSemanticCache extends BaseCache {
private embeddings: EmbeddingsInterface;

private config: AzureCosmosDBMongoDBConfig;

private similarityScoreThreshold: number;

private cacheDict: { [key: string]: AzureCosmosDBMongoDBVectorStore } = {};

private readonly client: MongoClient | undefined;

private vectorDistanceFunction: string;

constructor(
embeddings: EmbeddingsInterface,
dbConfig: AzureCosmosDBMongoDBConfig,
similarityScoreThreshold: number = 0.6
) {
super();

const connectionString =
dbConfig.connectionString ??
getEnvironmentVariable("AZURE_COSMOSDB_MONGODB_CONNECTION_STRING");

if (!dbConfig.client && !connectionString) {
throw new Error(
"AzureCosmosDBMongoDBSemanticCache client or connection string must be set."
);
}

if (!dbConfig.client) {
this.client = new MongoClient(connectionString!, {
appName: "langchainjs",
});
} else {
this.client = dbConfig.client;
}

this.config = {
...dbConfig,
client: this.client,
collectionName: dbConfig.collectionName ?? "semanticCacheContainer",
};

this.similarityScoreThreshold = similarityScoreThreshold;
this.embeddings = embeddings;
this.vectorDistanceFunction =
dbConfig?.indexOptions?.similarity ??
AzureCosmosDBMongoDBSimilarityType.COS;
}

private getLlmCache(llmKey: string) {
const key = getCacheKey(llmKey);
if (!this.cacheDict[key]) {
this.cacheDict[key] = new AzureCosmosDBMongoDBVectorStore(
this.embeddings,
this.config
);
}
return this.cacheDict[key];
}

/**
* Retrieves data from the cache.
*
* @param prompt The prompt for lookup.
* @param llmKey The LLM key used to construct the cache key.
* @returns An array of Generations if found, null otherwise.
*/
async lookup(prompt: string, llmKey: string): Promise<Generation[] | null> {
const llmCache = this.getLlmCache(llmKey);

const queryEmbedding = await this.embeddings.embedQuery(prompt);
const results = await llmCache.similaritySearchVectorWithScore(
queryEmbedding,
1,
this.config.indexOptions?.indexType
);
if (!results.length) return null;

const generations = results
.flatMap(([document, score]) => {
const isSimilar =
(this.vectorDistanceFunction ===
AzureCosmosDBMongoDBSimilarityType.L2 &&
score <= this.similarityScoreThreshold) ||
(this.vectorDistanceFunction !==
AzureCosmosDBMongoDBSimilarityType.L2 &&
score >= this.similarityScoreThreshold);

if (!isSimilar) return undefined;

return document.metadata.return_value.map((gen: string) =>
deserializeStoredGeneration(JSON.parse(gen))
);
})
.filter((gen) => gen !== undefined);

return generations.length > 0 ? generations : null;
}

/**
* Updates the cache with new data.
*
* @param prompt The prompt for update.
* @param llmKey The LLM key used to construct the cache key.
* @param value The value to be stored in the cache.
*/
public async update(
prompt: string,
llmKey: string,
returnValue: Generation[]
): Promise<void> {
const serializedGenerations = returnValue.map((generation) =>
JSON.stringify(serializeGeneration(generation))
);

const llmCache = this.getLlmCache(llmKey);

const metadata = {
llm_string: llmKey,
prompt,
return_value: serializedGenerations,
};

const doc = new Document({
pageContent: prompt,
metadata,
});

await llmCache.addDocuments([doc]);
}

/**
* deletes the semantic cache for a given llmKey
* @param llmKey
*/
public async clear(llmKey: string) {
const key = getCacheKey(llmKey);
if (this.cacheDict[key]) {
await this.cacheDict[key].delete();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { getEnvironmentVariable } from "@langchain/core/utils/env";
import {
AzureCosmosDBNoSQLConfig,
AzureCosmosDBNoSQLVectorStore,
} from "./azure_cosmosdb_nosql.js";
} from "../azure_cosmosdb_nosql.js";

const USER_AGENT_SUFFIX = "langchainjs-cdbnosql-semanticcache-javascript";
const DEFAULT_CONTAINER_NAME = "semanticCacheContainer";
Expand Down
3 changes: 2 additions & 1 deletion libs/langchain-azure-cosmosdb/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export * from "./azure_cosmosdb_mongodb.js";
export * from "./azure_cosmosdb_nosql.js";
export * from "./caches.js";
export * from "./caches/caches_nosql.js";
export * from "./caches/caches_mongodb.js";
export * from "./chat_histories/nosql.js";
export * from "./chat_histories/mongodb.js";
Loading

0 comments on commit dd2863e

Please sign in to comment.