Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scaffold otel + arize phoenix integration #55

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions packages/cannoli-core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,18 @@
"typescript": "^5.3.3"
},
"dependencies": {
"@arizeai/openinference-instrumentation-langchain": "0.2.0",
"@arizeai/openinference-semantic-conventions": "0.10.0",
"@langchain/anthropic": "0.2.1",
"@langchain/community": "0.2.12",
"@langchain/core": "0.2.7",
"@langchain/google-genai": "0.0.19",
"@langchain/groq": "0.0.12",
"@langchain/openai": "0.1.3",
"@opentelemetry/exporter-trace-otlp-proto": "0.53.0",
"@opentelemetry/instrumentation": "0.53.0",
"@opentelemetry/resources": "1.26.0",
"@opentelemetry/sdk-trace-web": "1.26.0",
"js-yaml": "^4.1.0",
"langchain": "0.2.5",
"nanoid": "5.0.7",
Expand All @@ -51,6 +57,7 @@
"tiny-invariant": "^1.3.1",
"tslib": "2.4.0",
"tsup": "^8.0.2",
"web-instrumentation-langchain": "workspace:*",
"zod": "3.23.8"
}
}
103 changes: 24 additions & 79 deletions packages/cannoli-core/src/cannoli.ts
Original file line number Diff line number Diff line change
@@ -1,87 +1,32 @@
import { Run, RunArgs, Stoppage } from "./run";

export function run({
cannoli,
llmConfigs,
args,
fileManager,
persistor,
actions,
httpTemplates,
replacers,
fetcher,
config,
secrets,
isMock,
resume,
onFinish,
}: RunArgs): [Promise<Stoppage>, () => void] {
let resolver: (stoppage: Stoppage) => void;
const done = new Promise<Stoppage>((resolve) => {
resolver = resolve;
});

const run = new Run({
llmConfigs,
cannoli,
args,
persistor,
onFinish: (stoppage: Stoppage) => {
resolver(stoppage);
if (onFinish) onFinish(stoppage);
},
fileManager,
actions,
httpTemplates,
replacers,
isMock,
fetcher,
config,
secrets,
resume,
});

run.start();

return [done, () => run.stop()];
export function run({ onFinish, ...args }: RunArgs): [Promise<Stoppage>, () => void] {
let resolver: (stoppage: Stoppage) => void;
const done = new Promise<Stoppage>((resolve) => {
resolver = resolve;
});

const run = new Run({
...args,
onFinish: (stoppage: Stoppage) => {
resolver(stoppage);
if (onFinish) onFinish(stoppage);
},
});

run.start();

return [done, () => run.stop()];
}

export async function resultsRun({
cannoli,
llmConfigs,
args,
fileManager,
persistor,
actions,
httpTemplates,
replacers,
fetcher,
config,
secrets,
isMock,
resume,
}: RunArgs): Promise<Record<string, string>> {
const [done] = run({
cannoli,
llmConfigs,
args,
fileManager,
persistor,
actions,
httpTemplates,
replacers,
fetcher,
config,
secrets,
isMock,
resume,
});
export async function resultsRun(args: RunArgs): Promise<Record<string, string>> {
const [done] = run({ ...args });

const stoppage = await done;
const stoppage = await done;

if (stoppage.reason === "error") {
throw new Error(`Error occurred during the run: ${stoppage.message}`);
}
if (stoppage.reason === "error") {
throw new Error(`Error occurred during the run: ${stoppage.message}`);
}

return stoppage.results;
return stoppage.results;
}
54 changes: 54 additions & 0 deletions packages/cannoli-core/src/instrumentation.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import { WebTracerProvider, SimpleSpanProcessor } from "@opentelemetry/sdk-trace-web"
import { SEMRESATTRS_PROJECT_NAME } from "@arizeai/openinference-semantic-conventions";
import { Resource } from "@opentelemetry/resources"
import * as lcCallbackManager from "@langchain/core/callbacks/manager";
import { LangChainInstrumentation } from "web-instrumentation-langchain";

import { TracingConfig } from "src/run"
import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-proto";

const instrumentPhoenixLangchain = () => {
const lcInstrumentation = new LangChainInstrumentation();
lcInstrumentation.manuallyInstrument(lcCallbackManager);

console.log("🔎 Phoenix Langchain instrumentation enabled 🔎")
}

export const createPhoenixWebTracerProvider = ({ tracingConfig }: { tracingConfig: TracingConfig }) => {
if (!tracingConfig.phoenix?.enabled) {
return
}

try {

const provider = new WebTracerProvider({
resource: new Resource({
[SEMRESATTRS_PROJECT_NAME]: tracingConfig.phoenix.projectName,
}),
})

const traceUrl = `${tracingConfig.phoenix.baseUrl.endsWith("/") ? tracingConfig.phoenix.baseUrl : `${tracingConfig.phoenix.baseUrl}/`}v1/traces`
// provider.addSpanProcessor(new SimpleSpanProcessor(new ConsoleSpanExporter()))
provider.addSpanProcessor(new SimpleSpanProcessor(new OTLPTraceExporter({
url: traceUrl,
headers: {
// allow cross-origin requests
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, POST, PUT, DELETE, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type, Authorization, Content-Length, X-Requested-With, Accept, Origin",
"Access-Control-Allow-Credentials": "true",
}
})))

provider.register()

console.log("🔎 Phoenix tracing enabled 🔎")

instrumentPhoenixLangchain()

return provider
} catch (error) {
console.error("Error enabling Phoenix tracing", error)
}
}

Empty file.
37 changes: 30 additions & 7 deletions packages/cannoli-core/src/providers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export type SupportedProviders = "openai" | "ollama" | "gemini" | "anthropic" |

import { z } from "zod";
import invariant from "tiny-invariant";
import { TracingConfig } from "src/run";

export const GenericFunctionCallSchema = z.object({
name: z.string(),
Expand Down Expand Up @@ -69,7 +70,11 @@ export type LLMConfig = (Omit<GenericModelConfig, "provider"> & { provider: Supp

type ConstructorArgs = {
configs: LLMConfig[];
tracingConfig?: TracingConfig | null;
valtownApiKey?: string;
runId?: string;
runName?: string;
runDateEpochMs?: number;
};

export type GenericCompletionParams = {
Expand Down Expand Up @@ -105,6 +110,10 @@ export class LLMProvider {
getDefaultConfigByProvider?: GetDefaultsByProvider;
initialized = false;
valtownApiKey?: string;
tracingConfig?: TracingConfig | null;
runId?: string;
runName?: string;
runDateEpochMs?: number;

constructor(initArgs: ConstructorArgs) {
this.init(initArgs);
Expand All @@ -115,6 +124,10 @@ export class LLMProvider {
this.provider = initArgs.configs[0].provider as SupportedProviders;
this.baseConfig = initArgs.configs[0];
this.valtownApiKey = initArgs.valtownApiKey;
this.tracingConfig = initArgs.tracingConfig;
this.runId = initArgs.runId;
this.runDateEpochMs = initArgs.runDateEpochMs;
this.runName = initArgs.runName;
this.getDefaultConfigByProvider = (provider: SupportedProviders) => {
return initArgs.configs.find((config) => config.provider === provider);
}
Expand Down Expand Up @@ -166,9 +179,10 @@ export class LLMProvider {
const url = urlString || undefined;
const query = queryString ? Object.fromEntries(new URLSearchParams(queryString).entries()) : undefined

let client: BaseChatModel;
switch (provider) {
case "openai":
return new ChatOpenAI({
client = new ChatOpenAI({
apiKey: config.apiKey,
model: config.model,
temperature: config.temperature,
Expand All @@ -187,8 +201,9 @@ export class LLMProvider {
defaultQuery: query
}
});
break;
case "azure_openai":
return new AzureChatOpenAI({
client = new AzureChatOpenAI({
temperature: config.temperature,
model: config.model,
apiKey: config.apiKey,
Expand All @@ -212,9 +227,10 @@ export class LLMProvider {
defaultQuery: query,
}
});
break;
case "ollama":
if (args?.hasFunctionCall) {
return new OllamaFunctions({
client = new OllamaFunctions({
baseUrl: url,
model: config.model,
temperature: config.temperature,
Expand All @@ -223,9 +239,10 @@ export class LLMProvider {
presencePenalty: config.presence_penalty,
stop: config.stop?.split(","),
});
break;
}

return new ChatOllama({
client = new ChatOllama({
baseUrl: url,
model: config.model,
temperature: config.temperature,
Expand All @@ -234,8 +251,9 @@ export class LLMProvider {
presencePenalty: config.presence_penalty,
stop: config.stop?.split(","),
});
break;
case "gemini":
return new ChatGoogleGenerativeAI({
client = new ChatGoogleGenerativeAI({
maxRetries: 3,
model: config.model,
apiKey: config.apiKey,
Expand All @@ -244,8 +262,9 @@ export class LLMProvider {
topP: config.top_p,
stopSequences: config.stop?.split(","),
});
break;
case "anthropic":
return new ChatAnthropic({
client = new ChatAnthropic({
apiKey: config.apiKey,
model: config.model,
temperature: config.temperature,
Expand All @@ -261,17 +280,21 @@ export class LLMProvider {
},
}
});
break;
case "groq":
return new ChatGroq({
client = new ChatGroq({
apiKey: config.apiKey,
model: config.model,
temperature: config.temperature,
stopSequences: config.stop?.split(","),
maxRetries: 3,
});
break;
default:
throw new Error("Unsupported provider");
}

return client.withConfig({ metadata: { runId: this.runId, runName: this.runName, runDateEpochMs: this.runDateEpochMs } }) as unknown as BaseChatModel;
};

static convertMessages = (
Expand Down
Loading
Loading