From 621d99a3ca0af657b09b48ce9a4d9b5b77fa930e Mon Sep 17 00:00:00 2001 From: jacoblee93 Date: Mon, 23 Dec 2024 13:07:35 -0800 Subject: [PATCH 1/3] Adds manual flush mode for high volume serverless environments --- js/src/client.ts | 38 ++++++++++-- js/src/tests/batch_client.test.ts | 100 ++++++++++++++++++++++++++++++ 2 files changed, 134 insertions(+), 4 deletions(-) diff --git a/js/src/client.ts b/js/src/client.ts index 802e26ce2..ce1ffb735 100644 --- a/js/src/client.ts +++ b/js/src/client.ts @@ -82,6 +82,11 @@ export interface ClientConfig { blockOnRootRunFinalization?: boolean; traceBatchConcurrency?: number; fetchOptions?: RequestInit; + /** + * Whether to require manual .flush() calls before sending traces. + * Useful to solve rate limits at trace high volumes. + */ + manualFlushMode?: boolean; } /** @@ -488,6 +493,8 @@ export class Client implements LangSmithTracingClientInterface { // eslint-disable-next-line @typescript-eslint/no-explicit-any private _getServerInfoPromise?: Promise>; + private manualFlushMode = false; + constructor(config: ClientConfig = {}) { const defaultConfig = Client.getDefaultClientConfig(); @@ -525,6 +532,7 @@ export class Client implements LangSmithTracingClientInterface { config.blockOnRootRunFinalization ?? this.blockOnRootRunFinalization; this.batchSizeBytesLimit = config.batchSizeBytesLimit; this.fetchOptions = config.fetchOptions || {}; + this.manualFlushMode = config.manualFlushMode ?? this.manualFlushMode; } public static getDefaultClientConfig(): { @@ -775,14 +783,17 @@ export class Client implements LangSmithTracingClientInterface { } private drainAutoBatchQueue(batchSizeLimit: number) { + const promises = []; while (this.autoBatchQueue.items.length > 0) { const [batch, done] = this.autoBatchQueue.pop(batchSizeLimit); if (!batch.length) { done(); break; } - void this._processBatch(batch, done).catch(console.error); + const batchPromise = this._processBatch(batch, done).catch(console.error); + promises.push(batchPromise); } + return Promise.all(promises); } private async _processBatch(batch: AutoBatchQueueItem[], done: () => void) { @@ -817,14 +828,18 @@ export class Client implements LangSmithTracingClientInterface { item.item = mergeRuntimeEnvIntoRunCreate(item.item as RunCreate); } const itemPromise = this.autoBatchQueue.push(item); + if (this.manualFlushMode) { + // Rely on manual flushing in serverless environments + return itemPromise; + } const sizeLimitBytes = await this._getBatchSizeLimitBytes(); if (this.autoBatchQueue.sizeBytes > sizeLimitBytes) { - this.drainAutoBatchQueue(sizeLimitBytes); + void this.drainAutoBatchQueue(sizeLimitBytes); } if (this.autoBatchQueue.items.length > 0) { this.autoBatchTimeout = setTimeout(() => { this.autoBatchTimeout = undefined; - this.drainAutoBatchQueue(sizeLimitBytes); + void this.drainAutoBatchQueue(sizeLimitBytes); }, this.autoBatchAggregationDelayMs); } return itemPromise; @@ -872,6 +887,14 @@ export class Client implements LangSmithTracingClientInterface { return await this.settings; } + /** + * Flushes current queued traces. + */ + public async flush() { + const sizeLimitBytes = await this._getBatchSizeLimitBytes(); + await this.drainAutoBatchQueue(sizeLimitBytes); + } + public async createRun(run: CreateRunParams): Promise { if (!this._filterForSampling([run]).length) { return; @@ -1239,7 +1262,8 @@ export class Client implements LangSmithTracingClientInterface { if ( run.end_time !== undefined && data.parent_run_id === undefined && - this.blockOnRootRunFinalization + this.blockOnRootRunFinalization && + !this.manualFlushMode ) { // Trigger batches as soon as a root trace ends and wait to ensure trace finishes // in serverless environments. @@ -4381,6 +4405,12 @@ export class Client implements LangSmithTracingClientInterface { * @returns A promise that resolves once all currently pending traces have sent. */ public awaitPendingTraceBatches() { + if (this.manualFlushMode) { + console.warn( + "[WARNING]: When tracing in manual flush mode, you must call `await client.flush()` manually to submit trace batches." + ); + return Promise.resolve(); + } return Promise.all([ ...this.autoBatchQueue.items.map(({ itemPromise }) => itemPromise), this.batchIngestCaller.queue.onIdle(), diff --git a/js/src/tests/batch_client.test.ts b/js/src/tests/batch_client.test.ts index e3b358aa6..5c40ac1ae 100644 --- a/js/src/tests/batch_client.test.ts +++ b/js/src/tests/batch_client.test.ts @@ -667,6 +667,106 @@ describe.each(ENDPOINT_TYPES)( }); }); + it("should flush traces in batches with manualFlushMode enabled", async () => { + const client = new Client({ + apiKey: "test-api-key", + batchSizeBytesLimit: 10000, + autoBatchTracing: true, + manualFlushMode: true, + }); + const callSpy = jest + .spyOn((client as any).batchIngestCaller, "call") + .mockResolvedValue({ + ok: true, + text: () => "", + }); + jest.spyOn(client as any, "_getServerInfo").mockImplementation(() => { + return { + version: "foo", + batch_ingest_config: { ...extraBatchIngestConfig }, + }; + }); + const projectName = "__test_batch"; + + const runIds = await Promise.all( + [...Array(15)].map(async (_, i) => { + const runId = uuidv4(); + const dottedOrder = convertToDottedOrderFormat( + new Date().getTime() / 1000, + runId + ); + const params = mergeRuntimeEnvIntoRunCreate({ + id: runId, + project_name: projectName, + name: "test_run " + i, + run_type: "llm", + inputs: { text: "hello world " + i }, + trace_id: runId, + dotted_order: dottedOrder, + } as RunCreate); + // Allow some extra space for other request properties + const mockRunSize = 950; + const padCount = mockRunSize - JSON.stringify(params).length; + params.inputs.text = params.inputs.text + "x".repeat(padCount); + await client.createRun(params); + return runId; + }) + ); + + await new Promise((resolve) => setTimeout(resolve, 500)); + + expect(callSpy.mock.calls.length).toBe(0); + + await client.flush(); + + expect(callSpy.mock.calls.length).toBe(2); + + const calledRequestParam: any = callSpy.mock.calls[0][2]; + const calledRequestParam2: any = callSpy.mock.calls[1][2]; + + const firstBatchBody = await parseMockRequestBody( + calledRequestParam?.body + ); + const secondBatchBody = await parseMockRequestBody( + calledRequestParam2?.body + ); + + const initialBatchBody = + firstBatchBody.post.length === 10 ? firstBatchBody : secondBatchBody; + const followupBatchBody = + firstBatchBody.post.length === 10 ? secondBatchBody : firstBatchBody; + + // Queue should drain as soon as size limit is reached, + // sending both batches + expect(initialBatchBody).toEqual({ + post: runIds.slice(0, 10).map((runId, i) => + expect.objectContaining({ + id: runId, + run_type: "llm", + inputs: { + text: expect.stringContaining("hello world " + i), + }, + trace_id: runId, + }) + ), + patch: [], + }); + + expect(followupBatchBody).toEqual({ + post: runIds.slice(10).map((runId, i) => + expect.objectContaining({ + id: runId, + run_type: "llm", + inputs: { + text: expect.stringContaining("hello world " + (i + 10)), + }, + trace_id: runId, + }) + ), + patch: [], + }); + }); + it("a very low batch size limit should be equivalent to single calls", async () => { const client = new Client({ apiKey: "test-api-key", From fbebd678f5d38602cabc6289f4fbce2cced4b09c Mon Sep 17 00:00:00 2001 From: jacoblee93 Date: Mon, 23 Dec 2024 13:09:45 -0800 Subject: [PATCH 2/3] Bump version --- js/package.json | 2 +- js/src/index.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/js/package.json b/js/package.json index 9d319fcb7..7b833e87c 100644 --- a/js/package.json +++ b/js/package.json @@ -1,6 +1,6 @@ { "name": "langsmith", - "version": "0.2.13", + "version": "0.2.14", "description": "Client library to connect to the LangSmith LLM Tracing and Evaluation Platform.", "packageManager": "yarn@1.22.19", "files": [ diff --git a/js/src/index.ts b/js/src/index.ts index d9bc0220c..ac2c99b8c 100644 --- a/js/src/index.ts +++ b/js/src/index.ts @@ -18,4 +18,4 @@ export { RunTree, type RunTreeConfig } from "./run_trees.js"; export { overrideFetchImplementation } from "./singletons/fetch.js"; // Update using yarn bump-version -export const __version__ = "0.2.13"; +export const __version__ = "0.2.14"; From 75615c57b6e1f25b2cf8d51b338bf6afb79185f8 Mon Sep 17 00:00:00 2001 From: jacoblee93 Date: Mon, 23 Dec 2024 13:11:59 -0800 Subject: [PATCH 3/3] Fix docstring typo --- js/src/client.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/js/src/client.ts b/js/src/client.ts index ce1ffb735..47079823b 100644 --- a/js/src/client.ts +++ b/js/src/client.ts @@ -84,7 +84,7 @@ export interface ClientConfig { fetchOptions?: RequestInit; /** * Whether to require manual .flush() calls before sending traces. - * Useful to solve rate limits at trace high volumes. + * Useful if encountering network rate limits at trace high volumes. */ manualFlushMode?: boolean; }