Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(js): Adds manual flush mode #1351

Merged
merged 3 commits into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion js/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "langsmith",
"version": "0.2.13",
"version": "0.2.14",
"description": "Client library to connect to the LangSmith LLM Tracing and Evaluation Platform.",
"packageManager": "[email protected]",
"files": [
Expand Down
38 changes: 34 additions & 4 deletions js/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@
blockOnRootRunFinalization?: boolean;
traceBatchConcurrency?: number;
fetchOptions?: RequestInit;
/**
* Whether to require manual .flush() calls before sending traces.
* Useful if encountering network rate limits at trace high volumes.
*/
manualFlushMode?: boolean;
}

/**
Expand Down Expand Up @@ -424,7 +429,7 @@
// If there is an item on the queue we were unable to pop,
// just return it as a single batch.
if (popped.length === 0 && this.items.length > 0) {
const item = this.items.shift()!;

Check warning on line 432 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

Forbidden non-null assertion
popped.push(item);
poppedSizeBytes += item.size;
this.sizeBytes -= item.size;
Expand Down Expand Up @@ -488,6 +493,8 @@
// eslint-disable-next-line @typescript-eslint/no-explicit-any
private _getServerInfoPromise?: Promise<Record<string, any>>;

private manualFlushMode = false;

constructor(config: ClientConfig = {}) {
const defaultConfig = Client.getDefaultClientConfig();

Expand Down Expand Up @@ -525,6 +532,7 @@
config.blockOnRootRunFinalization ?? this.blockOnRootRunFinalization;
this.batchSizeBytesLimit = config.batchSizeBytesLimit;
this.fetchOptions = config.fetchOptions || {};
this.manualFlushMode = config.manualFlushMode ?? this.manualFlushMode;
}

public static getDefaultClientConfig(): {
Expand Down Expand Up @@ -775,14 +783,17 @@
}

private drainAutoBatchQueue(batchSizeLimit: number) {
const promises = [];
while (this.autoBatchQueue.items.length > 0) {
const [batch, done] = this.autoBatchQueue.pop(batchSizeLimit);
if (!batch.length) {
done();
break;
}
void this._processBatch(batch, done).catch(console.error);
const batchPromise = this._processBatch(batch, done).catch(console.error);
promises.push(batchPromise);
}
return Promise.all(promises);
}

private async _processBatch(batch: AutoBatchQueueItem[], done: () => void) {
Expand Down Expand Up @@ -817,14 +828,18 @@
item.item = mergeRuntimeEnvIntoRunCreate(item.item as RunCreate);
}
const itemPromise = this.autoBatchQueue.push(item);
if (this.manualFlushMode) {
// Rely on manual flushing in serverless environments
return itemPromise;
}
const sizeLimitBytes = await this._getBatchSizeLimitBytes();
if (this.autoBatchQueue.sizeBytes > sizeLimitBytes) {
this.drainAutoBatchQueue(sizeLimitBytes);
void this.drainAutoBatchQueue(sizeLimitBytes);
}
if (this.autoBatchQueue.items.length > 0) {
this.autoBatchTimeout = setTimeout(() => {
this.autoBatchTimeout = undefined;
this.drainAutoBatchQueue(sizeLimitBytes);
void this.drainAutoBatchQueue(sizeLimitBytes);
}, this.autoBatchAggregationDelayMs);
}
return itemPromise;
Expand All @@ -847,7 +862,7 @@
if (this._serverInfo === undefined) {
try {
this._serverInfo = await this._getServerInfo();
} catch (e) {

Check warning on line 865 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

'e' is defined but never used. Allowed unused args must match /^_/u
console.warn(
`[WARNING]: LangSmith failed to fetch info on supported operations. Falling back to batch operations and default limits.`
);
Expand All @@ -872,6 +887,14 @@
return await this.settings;
}

/**
* Flushes current queued traces.
*/
public async flush() {
const sizeLimitBytes = await this._getBatchSizeLimitBytes();
await this.drainAutoBatchQueue(sizeLimitBytes);
}

public async createRun(run: CreateRunParams): Promise<void> {
if (!this._filterForSampling([run]).length) {
return;
Expand Down Expand Up @@ -1239,7 +1262,8 @@
if (
run.end_time !== undefined &&
data.parent_run_id === undefined &&
this.blockOnRootRunFinalization
this.blockOnRootRunFinalization &&
!this.manualFlushMode
) {
// Trigger batches as soon as a root trace ends and wait to ensure trace finishes
// in serverless environments.
Expand Down Expand Up @@ -1573,7 +1597,7 @@
treeFilter?: string;
isRoot?: boolean;
dataSourceType?: string;
}): Promise<any> {

Check warning on line 1600 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

Unexpected any. Specify a different type
let projectIds_ = projectIds || [];
if (projectNames) {
projectIds_ = [
Expand Down Expand Up @@ -1861,7 +1885,7 @@
`Failed to list shared examples: ${response.status} ${response.statusText}`
);
}
return result.map((example: any) => ({

Check warning on line 1888 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

Unexpected any. Specify a different type
...example,
_hostUrl: this.getHostUrl(),
}));
Expand Down Expand Up @@ -1998,7 +2022,7 @@
}
// projectId querying
return true;
} catch (e) {

Check warning on line 2025 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

'e' is defined but never used. Allowed unused args must match /^_/u
return false;
}
}
Expand Down Expand Up @@ -3373,7 +3397,7 @@
async _logEvaluationFeedback(
evaluatorResponse: EvaluationResult | EvaluationResults,
run?: Run,
sourceInfo?: { [key: string]: any }

Check warning on line 3400 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

Unexpected any. Specify a different type
): Promise<[results: EvaluationResult[], feedbacks: Feedback[]]> {
const evalResults: Array<EvaluationResult> =
this._selectEvalResults(evaluatorResponse);
Expand Down Expand Up @@ -3412,7 +3436,7 @@
public async logEvaluationFeedback(
evaluatorResponse: EvaluationResult | EvaluationResults,
run?: Run,
sourceInfo?: { [key: string]: any }

Check warning on line 3439 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

Unexpected any. Specify a different type
): Promise<EvaluationResult[]> {
const [results] = await this._logEvaluationFeedback(
evaluatorResponse,
Expand Down Expand Up @@ -3862,7 +3886,7 @@

public async createCommit(
promptIdentifier: string,
object: any,

Check warning on line 3889 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

Unexpected any. Specify a different type
options?: {
parentCommitHash?: string;
}
Expand Down Expand Up @@ -4094,7 +4118,7 @@
isPublic?: boolean;
isArchived?: boolean;
}
): Promise<Record<string, any>> {

Check warning on line 4121 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

Unexpected any. Specify a different type
if (!(await this.promptExists(promptIdentifier))) {
throw new Error("Prompt does not exist, you must create it first.");
}
Expand All @@ -4105,7 +4129,7 @@
throw await this._ownerConflictError("update a prompt", owner);
}

const payload: Record<string, any> = {};

Check warning on line 4132 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

Unexpected any. Specify a different type

if (options?.description !== undefined)
payload.description = options.description;
Expand Down Expand Up @@ -4381,6 +4405,12 @@
* @returns A promise that resolves once all currently pending traces have sent.
*/
public awaitPendingTraceBatches() {
if (this.manualFlushMode) {
console.warn(
"[WARNING]: When tracing in manual flush mode, you must call `await client.flush()` manually to submit trace batches."
);
return Promise.resolve();
}
return Promise.all([
...this.autoBatchQueue.items.map(({ itemPromise }) => itemPromise),
this.batchIngestCaller.queue.onIdle(),
Expand Down
2 changes: 1 addition & 1 deletion js/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ export { RunTree, type RunTreeConfig } from "./run_trees.js";
export { overrideFetchImplementation } from "./singletons/fetch.js";

// Update using yarn bump-version
export const __version__ = "0.2.13";
export const __version__ = "0.2.14";
100 changes: 100 additions & 0 deletions js/src/tests/batch_client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,106 @@ describe.each(ENDPOINT_TYPES)(
});
});

it("should flush traces in batches with manualFlushMode enabled", async () => {
const client = new Client({
apiKey: "test-api-key",
batchSizeBytesLimit: 10000,
autoBatchTracing: true,
manualFlushMode: true,
});
const callSpy = jest
.spyOn((client as any).batchIngestCaller, "call")
.mockResolvedValue({
ok: true,
text: () => "",
});
jest.spyOn(client as any, "_getServerInfo").mockImplementation(() => {
return {
version: "foo",
batch_ingest_config: { ...extraBatchIngestConfig },
};
});
const projectName = "__test_batch";

const runIds = await Promise.all(
[...Array(15)].map(async (_, i) => {
const runId = uuidv4();
const dottedOrder = convertToDottedOrderFormat(
new Date().getTime() / 1000,
runId
);
const params = mergeRuntimeEnvIntoRunCreate({
id: runId,
project_name: projectName,
name: "test_run " + i,
run_type: "llm",
inputs: { text: "hello world " + i },
trace_id: runId,
dotted_order: dottedOrder,
} as RunCreate);
// Allow some extra space for other request properties
const mockRunSize = 950;
const padCount = mockRunSize - JSON.stringify(params).length;
params.inputs.text = params.inputs.text + "x".repeat(padCount);
await client.createRun(params);
return runId;
})
);

await new Promise((resolve) => setTimeout(resolve, 500));

expect(callSpy.mock.calls.length).toBe(0);

await client.flush();

expect(callSpy.mock.calls.length).toBe(2);

const calledRequestParam: any = callSpy.mock.calls[0][2];
const calledRequestParam2: any = callSpy.mock.calls[1][2];

const firstBatchBody = await parseMockRequestBody(
calledRequestParam?.body
);
const secondBatchBody = await parseMockRequestBody(
calledRequestParam2?.body
);

const initialBatchBody =
firstBatchBody.post.length === 10 ? firstBatchBody : secondBatchBody;
const followupBatchBody =
firstBatchBody.post.length === 10 ? secondBatchBody : firstBatchBody;

// Queue should drain as soon as size limit is reached,
// sending both batches
expect(initialBatchBody).toEqual({
post: runIds.slice(0, 10).map((runId, i) =>
expect.objectContaining({
id: runId,
run_type: "llm",
inputs: {
text: expect.stringContaining("hello world " + i),
},
trace_id: runId,
})
),
patch: [],
});

expect(followupBatchBody).toEqual({
post: runIds.slice(10).map((runId, i) =>
expect.objectContaining({
id: runId,
run_type: "llm",
inputs: {
text: expect.stringContaining("hello world " + (i + 10)),
},
trace_id: runId,
})
),
patch: [],
});
});

it("a very low batch size limit should be equivalent to single calls", async () => {
const client = new Client({
apiKey: "test-api-key",
Expand Down
Loading