From febf4d8aa6bd0a6397977053b8938c4875e7153f Mon Sep 17 00:00:00 2001 From: marknguyen1302 Date: Tue, 2 Jul 2024 13:39:50 +0700 Subject: [PATCH] feat: add analytic metric, send benchmark to server --- cortex-js/src/app.ts | 4 + cortex-js/src/command.ts | 7 +- .../repositories/telemetry.interface.ts | 30 +++- .../telemetry/crash-report.interface.ts | 16 --- .../domain/telemetry/telemetry.interface.ts | 38 +++-- .../infrastructure/commanders/chat.command.ts | 19 ++- .../infrastructure/commanders/init.command.ts | 14 ++ .../commanders/models/model-pull.command.ts | 16 ++- .../usecases/benchmark.cli.usecases.ts | 64 ++++++--- .../usecases/cli.usecases.module.ts | 2 + .../controllers/chat.controller.ts | 14 +- .../controllers/models.controller.ts | 23 ++- .../telemetry/telemetry.repository.ts | 119 ++++++++++++--- .../file-manager/file-manager.service.ts | 27 ++++ cortex-js/src/usecases/chat/chat.usecases.ts | 1 - .../usecases/telemetry/telemetry.usecases.ts | 136 +++++++++++++++++- 16 files changed, 449 insertions(+), 81 deletions(-) delete mode 100644 cortex-js/src/domain/telemetry/crash-report.interface.ts diff --git a/cortex-js/src/app.ts b/cortex-js/src/app.ts index fd0db7123..3394a6f35 100644 --- a/cortex-js/src/app.ts +++ b/cortex-js/src/app.ts @@ -3,6 +3,7 @@ import { DocumentBuilder, SwaggerModule } from '@nestjs/swagger'; import { AppModule } from './app.module'; import { FileManagerService } from './infrastructure/services/file-manager/file-manager.service'; import { ValidationPipe } from '@nestjs/common'; +import { TelemetryUsecases } from './usecases/telemetry/telemetry.usecases'; export const getApp = async () => { const app = await NestFactory.create(AppModule, { snapshot: true, @@ -16,6 +17,9 @@ export const getApp = async () => { const fileService = app.get(FileManagerService); await fileService.getConfig(); + const telemetryService = await app.resolve(TelemetryUsecases); + await telemetryService.initInterval(); + app.useGlobalPipes( new ValidationPipe({ transform: true, diff --git a/cortex-js/src/command.ts b/cortex-js/src/command.ts index e85d7050f..274e401b3 100644 --- a/cortex-js/src/command.ts +++ b/cortex-js/src/command.ts @@ -15,10 +15,12 @@ async function bootstrap() { logger: ['warn', 'error'], errorHandler: async (error) => { await telemetryUseCase!.createCrashReport(error, TelemetrySource.CLI); + console.error(error); process.exit(1); }, serviceErrorHandler: async (error) => { await telemetryUseCase!.createCrashReport(error, TelemetrySource.CLI); + console.error(error); process.exit(1); }, }); @@ -26,10 +28,13 @@ async function bootstrap() { telemetryUseCase = await app.resolve(TelemetryUsecases); contextService = await app.resolve(ContextService); - telemetryUseCase!.sendCrashReport(); + const anonymousData = await telemetryUseCase!.updateAnonymousData(); await contextService!.init(async () => { contextService!.set('source', TelemetrySource.CLI); + contextService!.set('sessionId', anonymousData?.sessionId); + telemetryUseCase!.sendActivationEvent(TelemetrySource.CLI); + telemetryUseCase!.sendCrashReport(); return CommandFactory.runApplication(app); }); } diff --git a/cortex-js/src/domain/repositories/telemetry.interface.ts b/cortex-js/src/domain/repositories/telemetry.interface.ts index f67d77aeb..ff33edea1 100644 --- a/cortex-js/src/domain/repositories/telemetry.interface.ts +++ b/cortex-js/src/domain/repositories/telemetry.interface.ts @@ -1,6 +1,10 @@ +import { ModelStat } from '@/infrastructure/commanders/types/model-stat.interface'; import { + BenchmarkHardware, CrashReportAttributes, + EventAttributes, Telemetry, + TelemetryAnonymized, TelemetrySource, } from '../telemetry/telemetry.interface'; @@ -8,15 +12,39 @@ export abstract class TelemetryRepository { abstract readCrashReports( callback: (Telemetry: Telemetry) => void, ): Promise; + abstract createCrashReport( crashReport: CrashReportAttributes, source?: TelemetrySource, ): Promise; + abstract getLastCrashReport(): Promise; + abstract markLastCrashReportAsSent(): Promise; + abstract sendTelemetryToOTelCollector( endpoint: string, telemetry: Telemetry, ): Promise; - abstract sendTelemetryToServer(telemetry: Telemetry): Promise; + + abstract sendTelemetryToServer( + telemetryEvent: Telemetry['event'], + ): Promise; + + abstract sendEvent( + events: EventAttributes[], + source: TelemetrySource, + ): Promise; + + abstract getAnonymizedData(): Promise; + + abstract updateAnonymousData(data: TelemetryAnonymized): Promise; + + abstract sendBenchmarkToServer(data: { + hardware: BenchmarkHardware; + results: any; + metrics: any; + model: ModelStat; + sessionId: string; + }): Promise; } diff --git a/cortex-js/src/domain/telemetry/crash-report.interface.ts b/cortex-js/src/domain/telemetry/crash-report.interface.ts deleted file mode 100644 index 7eb55a707..000000000 --- a/cortex-js/src/domain/telemetry/crash-report.interface.ts +++ /dev/null @@ -1,16 +0,0 @@ -export interface TelemetryResource { - timestamp: number; - osName: string; - osVersion: string; - appVersion: string; - architecture: string; -} - -export interface CrashReportAttributes { - timestamp: number; - modelId: string; - operation: string; - params: any; - contextLength: number; - tokenPerSecond: number; -} diff --git a/cortex-js/src/domain/telemetry/telemetry.interface.ts b/cortex-js/src/domain/telemetry/telemetry.interface.ts index b1ed951e2..c897a436e 100644 --- a/cortex-js/src/domain/telemetry/telemetry.interface.ts +++ b/cortex-js/src/domain/telemetry/telemetry.interface.ts @@ -8,14 +8,7 @@ export interface TelemetryResource { source?: TelemetrySource; cpu?: string; gpus?: string; - // todo: consider about sessionId - // sessionId: string; -} - -export interface CrashReportEvent { - modelId?: string; - sessionId: string; - stack?: string; + sessionId?: string; } export interface Resource { @@ -77,6 +70,7 @@ export interface CrashReportAttributes { stack?: string; message: string; payload: CrashReportPayload; + sessionId?: string; } export interface TelemetryEventMetadata { @@ -90,3 +84,31 @@ export interface Telemetry { resourceLogs: TelemetryEvent[]; }; } + +export enum EventName { + INIT = 'init', + DOWNLOAD_MODEL = 'download-model', + CHAT = 'chat', + ACTIVATE = 'activate', + NEW_ACTIVATE = 'new_activate', +} + +export interface EventAttributes { + name: string; + modelId?: string; + sessionId?: string; +} + +export interface TelemetryAnonymized { + sessionId: string | null; + lastActiveAt?: string | null; +} +export interface BenchmarkHardware { + gpu: any[]; + cpu: any; + board: any; + disk: any[]; + chassis: any; + memLayout: any[]; + os: any; +} diff --git a/cortex-js/src/infrastructure/commanders/chat.command.ts b/cortex-js/src/infrastructure/commanders/chat.command.ts index 0124ab54c..8b72c7b37 100644 --- a/cortex-js/src/infrastructure/commanders/chat.command.ts +++ b/cortex-js/src/infrastructure/commanders/chat.command.ts @@ -10,6 +10,11 @@ import { PSCliUsecases } from './usecases/ps.cli.usecases'; import { ModelsUsecases } from '@/usecases/models/models.usecases'; import { SetCommandContext } from './decorators/CommandContext'; import { ModelStat } from './types/model-stat.interface'; +import { TelemetryUsecases } from '@/usecases/telemetry/telemetry.usecases'; +import { + EventName, + TelemetrySource, +} from '@/domain/telemetry/telemetry.interface'; import { ContextService } from '../services/context/context.service'; type ChatOptions = { @@ -36,6 +41,7 @@ export class ChatCommand extends CommandRunner { private readonly modelsUsecases: ModelsUsecases, private readonly psCliUsecases: PSCliUsecases, readonly contextService: ContextService, + private readonly telemetryUsecases: TelemetryUsecases, ) { super(); } @@ -66,14 +72,23 @@ export class ChatCommand extends CommandRunner { } if (!message) options.attach = true; - - return this.chatCliUsecases.chat( + const result = await this.chatCliUsecases.chat( modelId, options.threadId, message, // Accept both message from inputs or arguments options.attach, false, // Do not stop cortex session or loaded model ); + this.telemetryUsecases.sendEvent( + [ + { + name: EventName.CHAT, + modelId, + }, + ], + TelemetrySource.CLI, + ); + return result; } modelInquiry = async (models: ModelStat[]) => { diff --git a/cortex-js/src/infrastructure/commanders/init.command.ts b/cortex-js/src/infrastructure/commanders/init.command.ts index 583b26cf8..38cde473b 100644 --- a/cortex-js/src/infrastructure/commanders/init.command.ts +++ b/cortex-js/src/infrastructure/commanders/init.command.ts @@ -7,6 +7,11 @@ import { import { InitCliUsecases } from './usecases/init.cli.usecases'; import { InitOptions } from './types/init-options.interface'; import { SetCommandContext } from './decorators/CommandContext'; +import { TelemetryUsecases } from '@/usecases/telemetry/telemetry.usecases'; +import { + EventName, + TelemetrySource, +} from '@/domain/telemetry/telemetry.interface'; import { ContextService } from '../services/context/context.service'; @SubCommand({ @@ -24,6 +29,7 @@ export class InitCommand extends CommandRunner { private readonly inquirerService: InquirerService, private readonly initUsecases: InitCliUsecases, readonly contextService: ContextService, + private readonly telemetryUsecases: TelemetryUsecases, ) { super(); } @@ -42,6 +48,14 @@ export class InitCommand extends CommandRunner { const version = passedParams[0] ?? 'latest'; await this.initUsecases.installEngine(options, version); + this.telemetryUsecases.sendEvent( + [ + { + name: EventName.INIT, + }, + ], + TelemetrySource.CLI, + ); } } diff --git a/cortex-js/src/infrastructure/commanders/models/model-pull.command.ts b/cortex-js/src/infrastructure/commanders/models/model-pull.command.ts index c667389c5..e3397bbbd 100644 --- a/cortex-js/src/infrastructure/commanders/models/model-pull.command.ts +++ b/cortex-js/src/infrastructure/commanders/models/model-pull.command.ts @@ -3,6 +3,11 @@ import { exit } from 'node:process'; import { SetCommandContext } from '../decorators/CommandContext'; import { ModelsCliUsecases } from '@commanders/usecases/models.cli.usecases'; import { ModelNotFoundException } from '@/infrastructure/exception/model-not-found.exception'; +import { TelemetryUsecases } from '@/usecases/telemetry/telemetry.usecases'; +import { + EventName, + TelemetrySource, +} from '@/domain/telemetry/telemetry.interface'; import { ContextService } from '@/infrastructure/services/context/context.service'; import { existsSync } from 'fs'; import { join } from 'node:path'; @@ -25,6 +30,7 @@ export class ModelPullCommand extends CommandRunner { private readonly initUsecases: InitCliUsecases, private readonly fileService: FileManagerService, readonly contextService: ContextService, + private readonly telemetryUsecases: TelemetryUsecases, ) { super(); } @@ -59,7 +65,15 @@ export class ModelPullCommand extends CommandRunner { engine, ); } - + this.telemetryUsecases.sendEvent( + [ + { + name: EventName.DOWNLOAD_MODEL, + modelId: passedParams[0], + }, + ], + TelemetrySource.CLI, + ); console.log('\nDownload complete!'); exit(0); } diff --git a/cortex-js/src/infrastructure/commanders/usecases/benchmark.cli.usecases.ts b/cortex-js/src/infrastructure/commanders/usecases/benchmark.cli.usecases.ts index 849357188..b516eabc2 100644 --- a/cortex-js/src/infrastructure/commanders/usecases/benchmark.cli.usecases.ts +++ b/cortex-js/src/infrastructure/commanders/usecases/benchmark.cli.usecases.ts @@ -16,6 +16,9 @@ import { CortexUsecases } from '@/usecases/cortex/cortex.usecases'; import { inspect } from 'util'; import { defaultBenchmarkConfiguration } from '@/infrastructure/constants/benchmark'; import { PSCliUsecases } from './ps.cli.usecases'; +import { ModelStat } from '../types/model-stat.interface'; +import { BenchmarkHardware } from '@/domain/telemetry/telemetry.interface'; +import { TelemetryUsecases } from '@/usecases/telemetry/telemetry.usecases'; @Injectable() export class BenchmarkCliUsecases { @@ -24,6 +27,7 @@ export class BenchmarkCliUsecases { private readonly cortexUsecases: CortexUsecases, private readonly fileService: FileManagerService, private readonly psUsecases: PSCliUsecases, + private readonly telemetryUsecases: TelemetryUsecases, ) {} config: BenchmarkConfig; @@ -41,7 +45,7 @@ export class BenchmarkCliUsecases { ...options, }; - const model = params?.model ?? this.config.api.parameters.model; + const modelId = params?.model ?? this.config.api.parameters.model; this.cortexClient = new Cortex({ apiKey: this.config.api.api_key, baseURL: this.config.api.base_url, @@ -52,20 +56,20 @@ export class BenchmarkCliUsecases { detached: false, shell: process.platform == 'win32', }); - return this.cortexUsecases .startCortex() - .then(() => this.modelsCliUsecases.startModel(model)) + .then(() => this.modelsCliUsecases.startModel(modelId)) .then(() => this.psUsecases .getModels() - .then((models) => models.find((e) => e.modelId === model)), + .then((models) => models.find((e) => e.modelId === modelId)), ) .then((model) => { if (!model) throw new Error('Model is not started, please try again!'); + return model; }) - .then(() => this.runBenchmarks(model)) + .then((model) => this.runBenchmarks(model)) .then(() => { serveProcess.kill(); process.exit(0); @@ -101,11 +105,22 @@ export class BenchmarkCliUsecases { * using the systeminformation library * @returns the system resources */ - private async getSystemResources() { + private async getSystemResources(): Promise< + BenchmarkHardware & { + cpuLoad: any; + mem: any; + } + > { return { - cpu: await si.currentLoad(), + cpuLoad: await si.currentLoad(), mem: await si.mem(), gpu: (await si.graphics()).controllers, + cpu: await si.cpu(), + board: await si.baseboard(), + disk: await si.diskLayout(), + chassis: await si.chassis(), + memLayout: await si.memLayout(), + os: await si.osInfo(), }; } @@ -117,10 +132,10 @@ export class BenchmarkCliUsecases { */ private async getResourceChange(startData: any, endData: any) { return { - cpu: - startData.cpu && endData.cpu - ? ((endData.cpu.currentload - startData.cpu.currentload) / - startData.cpu.currentload) * + cpuLoad: + startData.cpuLoad && endData.cpuLoad + ? ((endData.cpuLoad.currentLoad - startData.cpuLoad.currentLoad) / + startData.cpuLoad.currentLoad) * 100 : null, mem: @@ -135,21 +150,15 @@ export class BenchmarkCliUsecases { * Benchmark a user using the OpenAI API * @returns */ - private async benchmarkUser(model: string) { + private async benchmarkUser(model: ModelStat) { const startResources = await this.getSystemResources(); const start = Date.now(); let tokenCount = 0; let firstTokenTime = null; try { - console.log('Benchmarking user...', { - model, - messages: this.config.api.parameters.messages, - max_tokens: this.config.api.parameters.max_tokens, - stream: true, - }); const stream = await this.cortexClient!.chat.completions.create({ - model, + model: model.modelId, messages: this.config.api.parameters.messages, max_tokens: this.config.api.parameters.max_tokens, stream: true, @@ -210,7 +219,7 @@ export class BenchmarkCliUsecases { /** * Run the benchmarks */ - private async runBenchmarks(model: string) { + private async runBenchmarks(model: ModelStat) { const allResults: any[] = []; const rounds = this.config.num_rounds || 1; @@ -262,6 +271,7 @@ export class BenchmarkCliUsecases { hardware: await this.getSystemResources(), results: allResults, metrics, + model, }; bar.stop(); @@ -269,6 +279,20 @@ export class BenchmarkCliUsecases { await this.fileService.getBenchmarkPath(), 'output.json', ); + await this.telemetryUsecases.sendBenchmarkEvent({ + hardware: { + cpu: output.hardware.cpu, + gpu: output.hardware.gpu, + memLayout: output.hardware.memLayout, + board: output.hardware.board, + disk: output.hardware.disk, + chassis: output.hardware.chassis, + os: output.hardware.os, + }, + results: output.results, + metrics: output.metrics, + model, + }); fs.writeFileSync(outputFilePath, JSON.stringify(output, null, 2)); console.log(`Benchmark results and metrics saved to ${outputFilePath}`); diff --git a/cortex-js/src/infrastructure/commanders/usecases/cli.usecases.module.ts b/cortex-js/src/infrastructure/commanders/usecases/cli.usecases.module.ts index 0c2576ed7..8f33fecd6 100644 --- a/cortex-js/src/infrastructure/commanders/usecases/cli.usecases.module.ts +++ b/cortex-js/src/infrastructure/commanders/usecases/cli.usecases.module.ts @@ -12,6 +12,7 @@ import { MessagesModule } from '@/usecases/messages/messages.module'; import { FileManagerModule } from '@/infrastructure/services/file-manager/file-manager.module'; import { PSCliUsecases } from './ps.cli.usecases'; import { BenchmarkCliUsecases } from './benchmark.cli.usecases'; +import { TelemetryModule } from '@/usecases/telemetry/telemetry.module'; @Module({ imports: [ @@ -23,6 +24,7 @@ import { BenchmarkCliUsecases } from './benchmark.cli.usecases'; AssistantsModule, MessagesModule, FileManagerModule, + TelemetryModule, ], providers: [ InitCliUsecases, diff --git a/cortex-js/src/infrastructure/controllers/chat.controller.ts b/cortex-js/src/infrastructure/controllers/chat.controller.ts index f86e6bbf4..f61daca54 100644 --- a/cortex-js/src/infrastructure/controllers/chat.controller.ts +++ b/cortex-js/src/infrastructure/controllers/chat.controller.ts @@ -4,11 +4,19 @@ import { ChatUsecases } from '@/usecases/chat/chat.usecases'; import { Response } from 'express'; import { ApiOperation, ApiTags, ApiResponse } from '@nestjs/swagger'; import { ChatCompletionResponseDto } from '../dtos/chat/chat-completion-response.dto'; +import { TelemetryUsecases } from '@/usecases/telemetry/telemetry.usecases'; +import { + EventName, + TelemetrySource, +} from '@/domain/telemetry/telemetry.interface'; @ApiTags('Inference') @Controller('chat') export class ChatController { - constructor(private readonly chatService: ChatUsecases) {} + constructor( + private readonly chatService: ChatUsecases, + private readonly telemetryUsecases: TelemetryUsecases, + ) {} @ApiOperation({ summary: 'Create chat completion', @@ -49,5 +57,9 @@ export class ChatController { res.status(error.statusCode ?? 400).send(error.message), ); } + this.telemetryUsecases.addEventToQueue({ + name: EventName.CHAT, + modelId: createChatDto.model, + }); } } diff --git a/cortex-js/src/infrastructure/controllers/models.controller.ts b/cortex-js/src/infrastructure/controllers/models.controller.ts index fa7d50633..742b1b4ca 100644 --- a/cortex-js/src/infrastructure/controllers/models.controller.ts +++ b/cortex-js/src/infrastructure/controllers/models.controller.ts @@ -21,6 +21,10 @@ import { StartModelSuccessDto } from '@/infrastructure/dtos/models/start-model-s import { TransformInterceptor } from '../interceptors/transform.interceptor'; import { CortexUsecases } from '@/usecases/cortex/cortex.usecases'; import { ModelSettingsDto } from '../dtos/models/model-settings.dto'; +import { + EventName, +} from '@/domain/telemetry/telemetry.interface'; +import { TelemetryUsecases } from '@/usecases/telemetry/telemetry.usecases'; import { CommonResponseDto } from '../dtos/common/common-response.dto'; @ApiTags('Models') @@ -30,6 +34,7 @@ export class ModelsController { constructor( private readonly modelsUsecases: ModelsUsecases, private readonly cortexUsecases: CortexUsecases, + private readonly telemetryUsecases: TelemetryUsecases, ) {} @HttpCode(201) @@ -111,9 +116,14 @@ export class ModelsController { ], }) @Get('download/:modelId(*)') - downloadModel(@Param('modelId') modelId: string) { - this.modelsUsecases.pullModel(modelId, false); + async downloadModel(@Param('modelId') modelId: string) { + await this.modelsUsecases.pullModel(modelId, false); + + this.telemetryUsecases.addEventToQueue({ + name: EventName.DOWNLOAD_MODEL, + modelId, + }); return { message: 'Download model started successfully.', }; @@ -153,9 +163,12 @@ export class ModelsController { description: 'The unique identifier of the model.', }) @Get('pull/:modelId(*)') - pullModel(@Param('modelId') modelId: string) { - this.modelsUsecases.pullModel(modelId); - + async pullModel(@Param('modelId') modelId: string) { + await this.modelsUsecases.pullModel(modelId); + this.telemetryUsecases.addEventToQueue({ + name: EventName.DOWNLOAD_MODEL, + modelId, + }); return { message: 'Download model started successfully.', }; diff --git a/cortex-js/src/infrastructure/repositories/telemetry/telemetry.repository.ts b/cortex-js/src/infrastructure/repositories/telemetry/telemetry.repository.ts index b20562d96..8283037ec 100644 --- a/cortex-js/src/infrastructure/repositories/telemetry/telemetry.repository.ts +++ b/cortex-js/src/infrastructure/repositories/telemetry/telemetry.repository.ts @@ -10,6 +10,9 @@ import { TelemetryResource, Telemetry, TelemetryEventMetadata, + EventAttributes, + TelemetryAnonymized, + BenchmarkHardware, } from '@/domain/telemetry/telemetry.interface'; import { Injectable } from '@nestjs/common'; import { join } from 'path'; @@ -17,6 +20,7 @@ import packageJson from '@/../package.json'; import axios from 'axios'; import { telemetryServerUrl } from '@/infrastructure/constants/cortex'; import { FileManagerService } from '@/infrastructure/services/file-manager/file-manager.service'; +import { ModelStat } from '@/infrastructure/commanders/types/model-stat.interface'; // refactor using convert to dto @Injectable() @@ -29,6 +33,7 @@ export class TelemetryRepositoryImpl implements TelemetryRepository { }; private readonly crashReportFileName = 'crash-report.jsonl'; + private readonly anonymizedDataFileName = 'session.json'; constructor(private readonly fileManagerService: FileManagerService) {} private async getTelemetryDirectory(): Promise { @@ -45,19 +50,35 @@ export class TelemetryRepositoryImpl implements TelemetryRepository { return hash.digest('hex'); } - async sendTelemetryToServer(telemetry: Telemetry) { + async sendTelemetryToServer( + telemetryEvent: Telemetry['event'], + type: 'crash-report' | 'metrics' = 'crash-report', + ): Promise { try { - await axios.post( - `${telemetryServerUrl}/api/v1/crash-report`, - telemetry.event, - { - headers: { - 'Content-Type': 'application/json', - 'cortex-checksum': this.generateChecksum(telemetry.event), - }, + await axios.post(`${telemetryServerUrl}/api/v1/${type}`, telemetryEvent, { + headers: { + 'Content-Type': 'application/json', + 'cortex-checksum': this.generateChecksum(telemetryEvent), timeout: 1000, }, - ); + }); + } catch (error) {} + } + + async sendBenchmarkToServer(data: { + hardware: BenchmarkHardware; + results: any; + metrics: any; + model: ModelStat; + sessionId: string; + }): Promise { + try { + await axios.post(`${telemetryServerUrl}/api/v1/benchmark`, data, { + headers: { + 'Content-Type': 'application/json', + 'cortex-checksum': this.generateChecksum(data), + }, + }); } catch (error) {} } @@ -118,10 +139,11 @@ export class TelemetryRepositoryImpl implements TelemetryRepository { crashReport: CrashReportAttributes, source: TelemetrySource, ): Promise { - const telemetryEvent = await this.convertCrashReportToTelemetryEvent( - crashReport, + const telemetryEvent = await this.convertToTelemetryEvent({ + attributes: crashReport, source, - ); + type: 'crash-report', + }); const metadata: TelemetryEventMetadata = { createdAt: new Date().toISOString(), sentAt: null, @@ -137,11 +159,15 @@ export class TelemetryRepositoryImpl implements TelemetryRepository { ); } - private async convertCrashReportToTelemetryEvent( - crashReport: CrashReportAttributes, - // move this to telemetryResource - source: TelemetrySource, - ): Promise { + private async convertToTelemetryEvent({ + attributes, + source, + type, + }: { + attributes: CrashReportAttributes | EventAttributes; + source: TelemetrySource; + type: 'crash-report' | 'metrics'; + }): Promise { const gpus = (await systemInformation.graphics()).controllers.map( ({ model, vendor, vram, vramDynamic }) => ({ model, @@ -150,9 +176,17 @@ export class TelemetryRepositoryImpl implements TelemetryRepository { vramDynamic, }), ); + + const body = + type === 'crash-report' + ? (attributes as CrashReportAttributes).message + : (attributes as EventAttributes).name; + + const severity = type === 'crash-report' ? 'ERROR' : 'INFO'; + const resourceAttributes: Attribute[] = Object.entries({ ...this.telemetryResource, - 'service.name': 'crash-reporting', + 'service.name': type, cpu: os.cpus()[0].model, gpus: JSON.stringify(gpus), source, @@ -160,7 +194,7 @@ export class TelemetryRepositoryImpl implements TelemetryRepository { key, value: { stringValue: value }, })); - const telemetryLogAttributes: Attribute[] = Object.entries(crashReport).map( + const telemetryLogAttributes: Attribute[] = Object.entries(attributes).map( ([key, value]) => { if (typeof value === 'object') { return { @@ -195,8 +229,8 @@ export class TelemetryRepositoryImpl implements TelemetryRepository { startTimeUnixNano: ( BigInt(Date.now()) * BigInt(1000000) ).toString(), - body: { stringValue: crashReport.message }, - severityText: 'ERROR', + body: { stringValue: body }, + severityText: severity, attributes: telemetryLogAttributes, }, ], @@ -204,4 +238,45 @@ export class TelemetryRepositoryImpl implements TelemetryRepository { ], }; } + + async sendEvent( + events: EventAttributes[], + source: TelemetrySource, + ): Promise { + const telemetryEvents = await Promise.all( + events.map(async (event) => + this.convertToTelemetryEvent({ + attributes: event, + source, + type: 'metrics', + }), + ), + ); + await this.sendTelemetryToServer( + { + resourceLogs: telemetryEvents, + }, + 'metrics', + ); + } + + async getAnonymizedData(): Promise { + const content = await this.fileManagerService.readFile( + join(await this.getTelemetryDirectory(), this.anonymizedDataFileName), + ); + + if (!content) { + return null; + } + + const data = JSON.parse(content) as TelemetryAnonymized; + return data; + } + + async updateAnonymousData(data: TelemetryAnonymized): Promise { + return this.fileManagerService.writeFile( + join(await this.getTelemetryDirectory(), this.anonymizedDataFileName), + JSON.stringify(data), + ); + } } diff --git a/cortex-js/src/infrastructure/services/file-manager/file-manager.service.ts b/cortex-js/src/infrastructure/services/file-manager/file-manager.service.ts index e1ea41e92..a1d7a0735 100644 --- a/cortex-js/src/infrastructure/services/file-manager/file-manager.service.ts +++ b/cortex-js/src/infrastructure/services/file-manager/file-manager.service.ts @@ -256,4 +256,31 @@ export class FileManagerService { await promises.mkdir(folderPath, { recursive: true }); } } + + async readFile(filePath: string): Promise { + try { + const isFileExist = existsSync(filePath); + if (!isFileExist) { + return null; + } + const content = await promises.readFile(filePath, { + encoding: 'utf8', + }); + return content; + } catch (error) { + throw error; + } + } + async writeFile(filePath: string, data: any): Promise { + try { + const dirPath = filePath.split('/').slice(0, -1).join('/'); + await this.createFolderIfNotExistInDataFolder(dirPath); + return promises.writeFile(filePath, data, { + encoding: 'utf8', + flag: 'w+', + }); + } catch (error) { + throw error; + } + } } diff --git a/cortex-js/src/usecases/chat/chat.usecases.ts b/cortex-js/src/usecases/chat/chat.usecases.ts index a3072652d..c5c132f46 100644 --- a/cortex-js/src/usecases/chat/chat.usecases.ts +++ b/cortex-js/src/usecases/chat/chat.usecases.ts @@ -28,7 +28,6 @@ export class ChatUsecases { ): Promise { const { model: modelId } = createChatDto; const model = await this.modelRepository.findOne(modelId); - if (!model) { throw new ModelNotFoundException(modelId); } diff --git a/cortex-js/src/usecases/telemetry/telemetry.usecases.ts b/cortex-js/src/usecases/telemetry/telemetry.usecases.ts index 89c00eaa8..0a38db0b9 100644 --- a/cortex-js/src/usecases/telemetry/telemetry.usecases.ts +++ b/cortex-js/src/usecases/telemetry/telemetry.usecases.ts @@ -1,16 +1,27 @@ import { TelemetryRepository } from '@/domain/repositories/telemetry.interface'; import { + BenchmarkHardware, CrashReportAttributes, + EventAttributes, + EventName, Telemetry, + TelemetryAnonymized, TelemetrySource, } from '@/domain/telemetry/telemetry.interface'; +import { ModelStat } from '@/infrastructure/commanders/types/model-stat.interface'; import { ContextService } from '@/infrastructure/services/context/context.service'; import { HttpException, Inject, Injectable, Scope } from '@nestjs/common'; +import { v4 } from 'uuid'; @Injectable({ scope: Scope.TRANSIENT }) export class TelemetryUsecases { private readonly crashReports: string[] = []; - private readonly maxSize = 100; + private readonly maxSize = 10; + private metricQueue: EventAttributes[] = []; + private readonly maxQueueSize = 10; + private readonly flushInterval = 1000 * 5; + private interval: NodeJS.Timeout; + private lastActiveAt?: string | null; constructor( @Inject('TELEMETRY_REPOSITORY') @@ -33,7 +44,6 @@ export class TelemetryUsecases { this.crashReports.shift(); } this.crashReports.push(JSON.stringify(crashReport)); - await this.telemetryRepository.createCrashReport(crashReport, source); } catch (e) {} return; @@ -46,7 +56,7 @@ export class TelemetryUsecases { const crashReport = await this.telemetryRepository.getLastCrashReport(); if (crashReport && !crashReport.metadata.sentAt) { const promises = [ - this.telemetryRepository.sendTelemetryToServer(crashReport), + this.telemetryRepository.sendTelemetryToServer(crashReport.event), ]; const collectorEndpoint = process.env.CORTEX_EXPORTER_OLTP_ENDPOINT; if (collectorEndpoint) { @@ -80,6 +90,7 @@ export class TelemetryUsecases { endpoint: this.contextService.get('endpoint') || '', command: this.contextService.get('command') || '', }, + sessionId: this.contextService.get('sessionId') || '', }; } @@ -87,6 +98,14 @@ export class TelemetryUsecases { return process.env.CORTEX_CRASH_REPORT !== '0'; } + private isMetricsEnabled(): boolean { + return process.env.CORTEX_METRICS !== '0'; + } + + private isBenchmarkEnabled(): boolean { + return process.env.CORTEX_BENCHMARK !== '0'; + } + private async catchException(): Promise { process.on('uncaughtException', async (error: Error) => { await this.createCrashReport( @@ -102,4 +121,115 @@ export class TelemetryUsecases { ); }); } + + async initInterval(): Promise { + this.interval = this.flushMetricQueueInterval(); + } + + async sendEvent( + events: EventAttributes[], + source: TelemetrySource, + ): Promise { + try { + if (!this.isMetricsEnabled()) return; + const sessionId = (this.contextService.get('sessionId') as string) || ''; + const sessionEvents = events.map((event) => ({ + ...event, + sessionId, + })); + await this.telemetryRepository.sendEvent(sessionEvents, source); + } catch (e) {} + } + + async sendActivationEvent(source: TelemetrySource): Promise { + try { + if (!this.isMetricsEnabled()) return; + if (!this.lastActiveAt) { + const currentData = await this.telemetryRepository.getAnonymizedData(); + this.lastActiveAt = currentData?.lastActiveAt; + } + const isActivatedToday = + this.lastActiveAt && + new Date(this.lastActiveAt).getDate() === new Date().getDate(); + if (isActivatedToday) return; + const isNewActivation = !this.lastActiveAt; + await this.sendEvent( + [ + { + name: isNewActivation ? EventName.NEW_ACTIVATE : EventName.ACTIVATE, + }, + ], + source, + ); + this.lastActiveAt = new Date().toISOString(); + } catch (e) {} + await this.updateAnonymousData(this.lastActiveAt); + } + + async addEventToQueue(event: EventAttributes): Promise { + if (!this.isMetricsEnabled()) return; + this.metricQueue.push({ + ...event, + sessionId: this.contextService.get('sessionId') || '', + }); + if (this.metricQueue.length >= this.maxQueueSize) { + await this.flushMetricQueue(); + } + } + + private async flushMetricQueue(): Promise { + if (this.metricQueue.length > 0) { + clearInterval(this.interval); + await this.sendEvent(this.metricQueue, TelemetrySource.CORTEX_SERVER); + this.interval = this.flushMetricQueueInterval(); + this.metricQueue = []; + } + } + + private flushMetricQueueInterval(): NodeJS.Timeout { + return setInterval(() => { + this.flushMetricQueue(); + }, this.flushInterval); + } + + async updateAnonymousData( + lastActiveAt?: string | null, + ): Promise { + try { + const currentData = await this.telemetryRepository.getAnonymizedData(); + const updatedData = { + ...currentData, + sessionId: currentData?.sessionId || v4(), + ...(lastActiveAt && { lastActiveAt }), + }; + await this.telemetryRepository.updateAnonymousData(updatedData); + return updatedData; + } catch (e) { + return null; + } + } + + async sendBenchmarkEvent({ + hardware, + results, + metrics, + model, + }: { + hardware: BenchmarkHardware; + results: any; + metrics: any; + model: ModelStat; + }): Promise { + try { + if (!this.isBenchmarkEnabled()) return; + const sessionId: string = this.contextService.get('sessionId') || ''; + await this.telemetryRepository.sendBenchmarkToServer({ + hardware, + results, + metrics, + model, + sessionId, + }); + } catch (e) {} + } }