Skip to content

Commit

Permalink
feat(js): Adds manual flush mode (#1351)
Browse files Browse the repository at this point in the history
  • Loading branch information
jacoblee93 authored Dec 23, 2024
1 parent d2412db commit 5ae3cf9
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 6 deletions.
2 changes: 1 addition & 1 deletion js/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "langsmith",
"version": "0.2.13",
"version": "0.2.14",
"description": "Client library to connect to the LangSmith LLM Tracing and Evaluation Platform.",
"packageManager": "[email protected]",
"files": [
Expand Down
38 changes: 34 additions & 4 deletions js/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ export interface ClientConfig {
blockOnRootRunFinalization?: boolean;
traceBatchConcurrency?: number;
fetchOptions?: RequestInit;
/**
* Whether to require manual .flush() calls before sending traces.
* Useful if encountering network rate limits at trace high volumes.
*/
manualFlushMode?: boolean;
}

/**
Expand Down Expand Up @@ -488,6 +493,8 @@ export class Client implements LangSmithTracingClientInterface {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
private _getServerInfoPromise?: Promise<Record<string, any>>;

private manualFlushMode = false;

constructor(config: ClientConfig = {}) {
const defaultConfig = Client.getDefaultClientConfig();

Expand Down Expand Up @@ -525,6 +532,7 @@ export class Client implements LangSmithTracingClientInterface {
config.blockOnRootRunFinalization ?? this.blockOnRootRunFinalization;
this.batchSizeBytesLimit = config.batchSizeBytesLimit;
this.fetchOptions = config.fetchOptions || {};
this.manualFlushMode = config.manualFlushMode ?? this.manualFlushMode;
}

public static getDefaultClientConfig(): {
Expand Down Expand Up @@ -775,14 +783,17 @@ export class Client implements LangSmithTracingClientInterface {
}

private drainAutoBatchQueue(batchSizeLimit: number) {
const promises = [];
while (this.autoBatchQueue.items.length > 0) {
const [batch, done] = this.autoBatchQueue.pop(batchSizeLimit);
if (!batch.length) {
done();
break;
}
void this._processBatch(batch, done).catch(console.error);
const batchPromise = this._processBatch(batch, done).catch(console.error);
promises.push(batchPromise);
}
return Promise.all(promises);
}

private async _processBatch(batch: AutoBatchQueueItem[], done: () => void) {
Expand Down Expand Up @@ -817,14 +828,18 @@ export class Client implements LangSmithTracingClientInterface {
item.item = mergeRuntimeEnvIntoRunCreate(item.item as RunCreate);
}
const itemPromise = this.autoBatchQueue.push(item);
if (this.manualFlushMode) {
// Rely on manual flushing in serverless environments
return itemPromise;
}
const sizeLimitBytes = await this._getBatchSizeLimitBytes();
if (this.autoBatchQueue.sizeBytes > sizeLimitBytes) {
this.drainAutoBatchQueue(sizeLimitBytes);
void this.drainAutoBatchQueue(sizeLimitBytes);
}
if (this.autoBatchQueue.items.length > 0) {
this.autoBatchTimeout = setTimeout(() => {
this.autoBatchTimeout = undefined;
this.drainAutoBatchQueue(sizeLimitBytes);
void this.drainAutoBatchQueue(sizeLimitBytes);
}, this.autoBatchAggregationDelayMs);
}
return itemPromise;
Expand Down Expand Up @@ -872,6 +887,14 @@ export class Client implements LangSmithTracingClientInterface {
return await this.settings;
}

/**
* Flushes current queued traces.
*/
public async flush() {
const sizeLimitBytes = await this._getBatchSizeLimitBytes();
await this.drainAutoBatchQueue(sizeLimitBytes);
}

public async createRun(run: CreateRunParams): Promise<void> {
if (!this._filterForSampling([run]).length) {
return;
Expand Down Expand Up @@ -1239,7 +1262,8 @@ export class Client implements LangSmithTracingClientInterface {
if (
run.end_time !== undefined &&
data.parent_run_id === undefined &&
this.blockOnRootRunFinalization
this.blockOnRootRunFinalization &&
!this.manualFlushMode
) {
// Trigger batches as soon as a root trace ends and wait to ensure trace finishes
// in serverless environments.
Expand Down Expand Up @@ -4381,6 +4405,12 @@ export class Client implements LangSmithTracingClientInterface {
* @returns A promise that resolves once all currently pending traces have sent.
*/
public awaitPendingTraceBatches() {
if (this.manualFlushMode) {
console.warn(
"[WARNING]: When tracing in manual flush mode, you must call `await client.flush()` manually to submit trace batches."
);
return Promise.resolve();
}
return Promise.all([
...this.autoBatchQueue.items.map(({ itemPromise }) => itemPromise),
this.batchIngestCaller.queue.onIdle(),
Expand Down
2 changes: 1 addition & 1 deletion js/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ export { RunTree, type RunTreeConfig } from "./run_trees.js";
export { overrideFetchImplementation } from "./singletons/fetch.js";

// Update using yarn bump-version
export const __version__ = "0.2.13";
export const __version__ = "0.2.14";
100 changes: 100 additions & 0 deletions js/src/tests/batch_client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,106 @@ describe.each(ENDPOINT_TYPES)(
});
});

it("should flush traces in batches with manualFlushMode enabled", async () => {
const client = new Client({
apiKey: "test-api-key",
batchSizeBytesLimit: 10000,
autoBatchTracing: true,
manualFlushMode: true,
});
const callSpy = jest
.spyOn((client as any).batchIngestCaller, "call")
.mockResolvedValue({
ok: true,
text: () => "",
});
jest.spyOn(client as any, "_getServerInfo").mockImplementation(() => {
return {
version: "foo",
batch_ingest_config: { ...extraBatchIngestConfig },
};
});
const projectName = "__test_batch";

const runIds = await Promise.all(
[...Array(15)].map(async (_, i) => {
const runId = uuidv4();
const dottedOrder = convertToDottedOrderFormat(
new Date().getTime() / 1000,
runId
);
const params = mergeRuntimeEnvIntoRunCreate({
id: runId,
project_name: projectName,
name: "test_run " + i,
run_type: "llm",
inputs: { text: "hello world " + i },
trace_id: runId,
dotted_order: dottedOrder,
} as RunCreate);
// Allow some extra space for other request properties
const mockRunSize = 950;
const padCount = mockRunSize - JSON.stringify(params).length;
params.inputs.text = params.inputs.text + "x".repeat(padCount);
await client.createRun(params);
return runId;
})
);

await new Promise((resolve) => setTimeout(resolve, 500));

expect(callSpy.mock.calls.length).toBe(0);

await client.flush();

expect(callSpy.mock.calls.length).toBe(2);

const calledRequestParam: any = callSpy.mock.calls[0][2];
const calledRequestParam2: any = callSpy.mock.calls[1][2];

const firstBatchBody = await parseMockRequestBody(
calledRequestParam?.body
);
const secondBatchBody = await parseMockRequestBody(
calledRequestParam2?.body
);

const initialBatchBody =
firstBatchBody.post.length === 10 ? firstBatchBody : secondBatchBody;
const followupBatchBody =
firstBatchBody.post.length === 10 ? secondBatchBody : firstBatchBody;

// Queue should drain as soon as size limit is reached,
// sending both batches
expect(initialBatchBody).toEqual({
post: runIds.slice(0, 10).map((runId, i) =>
expect.objectContaining({
id: runId,
run_type: "llm",
inputs: {
text: expect.stringContaining("hello world " + i),
},
trace_id: runId,
})
),
patch: [],
});

expect(followupBatchBody).toEqual({
post: runIds.slice(10).map((runId, i) =>
expect.objectContaining({
id: runId,
run_type: "llm",
inputs: {
text: expect.stringContaining("hello world " + (i + 10)),
},
trace_id: runId,
})
),
patch: [],
});
});

it("a very low batch size limit should be equivalent to single calls", async () => {
const client = new Client({
apiKey: "test-api-key",
Expand Down

0 comments on commit 5ae3cf9

Please sign in to comment.