From 60aaa3ea113b832a5078acc9b8ceef1dd95262df Mon Sep 17 00:00:00 2001 From: axel7083 <42176370+axel7083@users.noreply.github.com> Date: Thu, 15 Feb 2024 14:40:53 +0100 Subject: [PATCH] chore: remove the dependency of download task to the RecipeStatusUtils (#340) * feat: splitting download system Signed-off-by: axel7083 <42176370+axel7083@users.noreply.github.com> * fix: prettier&linter Signed-off-by: axel7083 <42176370+axel7083@users.noreply.github.com> * fix: recipe update registry Signed-off-by: axel7083 <42176370+axel7083@users.noreply.github.com> * fix: prettier Signed-off-by: axel7083 <42176370+axel7083@users.noreply.github.com> --------- Signed-off-by: axel7083 <42176370+axel7083@users.noreply.github.com> --- .../src/managers/applicationManager.spec.ts | 33 ++-- .../src/managers/applicationManager.ts | 11 +- .../src/managers/modelsManager.spec.ts | 154 ++++++--------- .../backend/src/managers/modelsManager.ts | 179 ++++++------------ .../registries/RecipeStatusRegistry.spec.ts | 1 + .../src/registries/RecipeStatusRegistry.ts | 20 +- .../backend/src/registries/TaskRegistry.ts | 26 +++ packages/backend/src/studio.ts | 10 +- packages/backend/src/utils/downloader.ts | 130 +++++++++++++ packages/frontend/src/stores/recipe.ts | 18 +- packages/shared/Messages.ts | 1 + 11 files changed, 334 insertions(+), 249 deletions(-) create mode 100644 packages/backend/src/utils/downloader.ts diff --git a/packages/backend/src/managers/applicationManager.spec.ts b/packages/backend/src/managers/applicationManager.spec.ts index a97d8a148..e8ca3590c 100644 --- a/packages/backend/src/managers/applicationManager.spec.ts +++ b/packages/backend/src/managers/applicationManager.spec.ts @@ -41,6 +41,7 @@ import type { podStopHandle, startupHandle, } from './podmanConnection'; +import { TaskRegistry } from '../registries/TaskRegistry'; const mocks = vi.hoisted(() => { return { @@ -73,11 +74,21 @@ const mocks = vi.hoisted(() => { listPodsMock: vi.fn(), stopPodMock: vi.fn(), removePodMock: vi.fn(), + performDownloadMock: vi.fn(), + onEventDownloadMock: vi.fn(), }; }); vi.mock('../models/AIConfig', () => ({ parseYamlFile: mocks.parseYamlFileMock, })); + +vi.mock('../utils/downloader', () => ({ + Downloader: class { + onEvent = mocks.onEventDownloadMock; + perform = mocks.performDownloadMock; + }, +})); + vi.mock('@podman-desktop/api', () => ({ provider: { getContainerConnections: mocks.getContainerConnectionsMock, @@ -129,10 +140,6 @@ describe('pullApplication', () => { const cloneRepositoryMock = vi.fn(); let manager: ApplicationManager; let modelsManager: ModelsManager; - let doDownloadModelWrapperSpy: MockInstance< - [modelId: string, url: string, taskUtil: RecipeStatusUtils, destFileName?: string], - Promise - >; vi.spyOn(utils, 'timeout').mockResolvedValue(); function mockForPullApplication(options: mockForPullApplicationOptions) { vi.spyOn(os, 'homedir').mockReturnValue('/home/user'); @@ -210,6 +217,7 @@ describe('pullApplication', () => { }, } as CatalogManager, telemetryLogger, + new TaskRegistry({ postMessage: vi.fn().mockResolvedValue(undefined) } as unknown as Webview), ); manager = new ApplicationManager( '/home/user/aistudio', @@ -225,14 +233,13 @@ describe('pullApplication', () => { modelsManager, telemetryLogger, ); - doDownloadModelWrapperSpy = vi.spyOn(modelsManager, 'doDownloadModelWrapper'); } test('pullApplication should clone repository and call downloadModelMain and buildImage', async () => { mockForPullApplication({ recipeFolderExists: false, }); vi.spyOn(modelsManager, 'isModelOnDisk').mockReturnValue(false); - doDownloadModelWrapperSpy.mockResolvedValue('path'); + mocks.performDownloadMock.mockResolvedValue('path'); const recipe: Recipe = { id: 'recipe1', name: 'Recipe 1', @@ -270,7 +277,7 @@ describe('pullApplication', () => { gitCloneOptions.targetDirectory = '/home/user/aistudio/recipe1'; expect(cloneRepositoryMock).toHaveBeenNthCalledWith(1, gitCloneOptions); } - expect(doDownloadModelWrapperSpy).toHaveBeenCalledOnce(); + expect(mocks.performDownloadMock).toHaveBeenCalledOnce(); expect(mocks.buildImageMock).toHaveBeenCalledOnce(); expect(mocks.buildImageMock).toHaveBeenCalledWith( `${gitCloneOptions.targetDirectory}${path.sep}contextdir1`, @@ -283,11 +290,7 @@ describe('pullApplication', () => { }, }, ); - expect(mocks.logUsageMock).toHaveBeenNthCalledWith(1, 'model.download', { - 'model.id': 'model1', - durationSeconds: 99, - }); - expect(mocks.logUsageMock).toHaveBeenNthCalledWith(2, 'recipe.pull', { + expect(mocks.logUsageMock).toHaveBeenNthCalledWith(1, 'recipe.pull', { 'recipe.id': 'recipe1', 'recipe.name': 'Recipe 1', durationSeconds: 99, @@ -298,7 +301,7 @@ describe('pullApplication', () => { recipeFolderExists: true, }); vi.spyOn(modelsManager, 'isModelOnDisk').mockReturnValue(false); - vi.spyOn(modelsManager, 'doDownloadModelWrapper').mockResolvedValue('path'); + mocks.performDownloadMock.mockResolvedValue('path'); const recipe: Recipe = { id: 'recipe1', name: 'Recipe 1', @@ -348,7 +351,7 @@ describe('pullApplication', () => { }; await manager.pullApplication(recipe, model); expect(cloneRepositoryMock).not.toHaveBeenCalled(); - expect(doDownloadModelWrapperSpy).not.toHaveBeenCalled(); + expect(mocks.performDownloadMock).not.toHaveBeenCalled(); }); test('pullApplication should mark the loading config as error if not container are found', async () => { @@ -385,7 +388,7 @@ describe('pullApplication', () => { await expect(manager.pullApplication(recipe, model)).rejects.toThrowError('No containers available.'); expect(cloneRepositoryMock).not.toHaveBeenCalled(); - expect(doDownloadModelWrapperSpy).not.toHaveBeenCalled(); + expect(mocks.performDownloadMock).not.toHaveBeenCalled(); }); }); describe('doCheckout', () => { diff --git a/packages/backend/src/managers/applicationManager.ts b/packages/backend/src/managers/applicationManager.ts index 303669d3f..d9f784385 100644 --- a/packages/backend/src/managers/applicationManager.ts +++ b/packages/backend/src/managers/applicationManager.ts @@ -112,8 +112,17 @@ export class ApplicationManager { // and backend (that define which model supports) const configAndFilteredContainers = this.getConfigAndFilterContainers(recipe.config, localFolder, taskUtil); + // Create the task on the recipe (which will be propagated to the TaskRegistry + taskUtil.setTask({ + id: model.id, + state: 'loading', + name: `Downloading model ${model.name}`, + labels: { + 'model-pulling': model.id, + }, + }); // get model by downloading it or retrieving locally - const modelPath = await this.modelsManager.downloadModel(model, taskUtil); + const modelPath = await this.modelsManager.downloadModel(model); // build all images, one per container (for a basic sample we should have 2 containers = sample app + model service) const images = await this.buildImages( diff --git a/packages/backend/src/managers/modelsManager.spec.ts b/packages/backend/src/managers/modelsManager.spec.ts index e6ad6dfff..919e91d31 100644 --- a/packages/backend/src/managers/modelsManager.spec.ts +++ b/packages/backend/src/managers/modelsManager.spec.ts @@ -20,20 +20,20 @@ import { type MockInstance, beforeEach, describe, expect, test, vi } from 'vites import os from 'os'; import fs, { type Stats, type PathLike } from 'node:fs'; import path from 'node:path'; -import type { DownloadModelResult } from './modelsManager'; import { ModelsManager } from './modelsManager'; import type { TelemetryLogger, Webview } from '@podman-desktop/api'; import type { CatalogManager } from './catalogManager'; import type { ModelInfo } from '@shared/src/models/IModelInfo'; -import { RecipeStatusUtils } from '../utils/recipeStatusUtils'; -import type { RecipeStatusRegistry } from '../registries/RecipeStatusRegistry'; import * as utils from '../utils/utils'; +import { TaskRegistry } from '../registries/TaskRegistry'; const mocks = vi.hoisted(() => { return { showErrorMessageMock: vi.fn(), logUsageMock: vi.fn(), logErrorMock: vi.fn(), + performDownloadMock: vi.fn(), + onEventDownloadMock: vi.fn(), }; }); @@ -52,10 +52,14 @@ vi.mock('@podman-desktop/api', () => { }; }); -let setTaskMock: MockInstance; -let taskUtils: RecipeStatusUtils; -let setTaskStateMock: MockInstance; -let setTaskErrorMock: MockInstance; +vi.mock('../utils/downloader', () => ({ + Downloader: class { + onEvent = mocks.onEventDownloadMock; + perform = mocks.performDownloadMock; + }, +})); + +let taskRegistry: TaskRegistry; const telemetryLogger = { logUsage: mocks.logUsageMock, @@ -64,12 +68,7 @@ const telemetryLogger = { beforeEach(() => { vi.resetAllMocks(); - taskUtils = new RecipeStatusUtils('recipe', { - setStatus: vi.fn(), - } as unknown as RecipeStatusRegistry); - setTaskMock = vi.spyOn(taskUtils, 'setTask'); - setTaskStateMock = vi.spyOn(taskUtils, 'setTaskState'); - setTaskErrorMock = vi.spyOn(taskUtils, 'setTaskError'); + taskRegistry = new TaskRegistry({ postMessage: vi.fn().mockResolvedValue(undefined) } as unknown as Webview); }); const dirent = [ @@ -143,6 +142,7 @@ test('getModelsInfo should get models in local directory', async () => { }, } as CatalogManager, telemetryLogger, + taskRegistry, ); await manager.loadLocalModels(); expect(manager.getModelsInfo()).toEqual([ @@ -188,6 +188,7 @@ test('getModelsInfo should return an empty array if the models folder does not e }, } as CatalogManager, telemetryLogger, + taskRegistry, ); manager.getLocalModelsFromDisk(); expect(manager.getModelsInfo()).toEqual([]); @@ -224,6 +225,7 @@ test('getLocalModelsFromDisk should return undefined Date and size when stat fai }, } as CatalogManager, telemetryLogger, + taskRegistry, ); await manager.loadLocalModels(); expect(manager.getModelsInfo()).toEqual([ @@ -266,6 +268,7 @@ test('loadLocalModels should post a message with the message on disk and on cata }, } as CatalogManager, telemetryLogger, + taskRegistry, ); await manager.loadLocalModels(); expect(postMessageMock).toHaveBeenNthCalledWith(1, { @@ -311,6 +314,7 @@ test('deleteLocalModel deletes the model folder', async () => { }, } as CatalogManager, telemetryLogger, + taskRegistry, ); await manager.loadLocalModels(); await manager.deleteLocalModel('model-id-1'); @@ -360,6 +364,7 @@ test('deleteLocalModel fails to delete the model folder', async () => { }, } as CatalogManager, telemetryLogger, + taskRegistry, ); await manager.loadLocalModels(); await manager.deleteLocalModel('model-id-1'); @@ -390,34 +395,28 @@ test('deleteLocalModel fails to delete the model folder', async () => { }); describe('downloadModel', () => { - const manager = new ModelsManager( - 'appdir', - {} as Webview, - { - getModels(): ModelInfo[] { - return []; - }, - } as CatalogManager, - telemetryLogger, - ); test('download model if not already on disk', async () => { - vi.spyOn(manager, 'isModelOnDisk').mockReturnValue(false); - const doDownloadModelWrapperMock = vi - .spyOn(manager, 'doDownloadModelWrapper') - .mockImplementation((_modelId: string, _url: string, _taskUtil: RecipeStatusUtils, _destFileName?: string) => { - return Promise.resolve(''); - }); - vi.spyOn(utils, 'getDurationSecondsSince').mockReturnValue(99); - await manager.downloadModel( + const manager = new ModelsManager( + 'appdir', + {} as Webview, { - id: 'id', - url: 'url', - name: 'name', - } as ModelInfo, - taskUtils, + getModels(): ModelInfo[] { + return []; + }, + } as CatalogManager, + telemetryLogger, + taskRegistry, ); - expect(doDownloadModelWrapperMock).toBeCalledWith('id', 'url', taskUtils); - expect(setTaskMock).toHaveBeenLastCalledWith({ + + vi.spyOn(manager, 'isModelOnDisk').mockReturnValue(false); + vi.spyOn(utils, 'getDurationSecondsSince').mockReturnValue(99); + const setMock = vi.spyOn(taskRegistry, 'set'); + await manager.downloadModel({ + id: 'id', + url: 'url', + name: 'name', + } as ModelInfo); + expect(setMock).toHaveBeenLastCalledWith({ id: 'id', name: 'Downloading model name', labels: { @@ -425,21 +424,29 @@ describe('downloadModel', () => { }, state: 'loading', }); - expect(mocks.logUsageMock).toHaveBeenNthCalledWith(1, 'model.download', { 'model.id': 'id', durationSeconds: 99 }); }); test('retrieve model path if already on disk', async () => { - vi.spyOn(manager, 'isModelOnDisk').mockReturnValue(true); - const getLocalModelPathMock = vi.spyOn(manager, 'getLocalModelPath').mockReturnValue(''); - await manager.downloadModel( + const manager = new ModelsManager( + 'appdir', + {} as Webview, { - id: 'id', - url: 'url', - name: 'name', - } as ModelInfo, - taskUtils, + getModels(): ModelInfo[] { + return []; + }, + } as CatalogManager, + telemetryLogger, + taskRegistry, ); + const setMock = vi.spyOn(taskRegistry, 'set'); + vi.spyOn(manager, 'isModelOnDisk').mockReturnValue(true); + const getLocalModelPathMock = vi.spyOn(manager, 'getLocalModelPath').mockReturnValue(''); + await manager.downloadModel({ + id: 'id', + url: 'url', + name: 'name', + } as ModelInfo); expect(getLocalModelPathMock).toBeCalledWith('id'); - expect(setTaskMock).toHaveBeenLastCalledWith({ + expect(setMock).toHaveBeenLastCalledWith({ id: 'id', name: 'Model name already present on disk', labels: { @@ -449,54 +456,3 @@ describe('downloadModel', () => { }); }); }); - -describe('doDownloadModelWrapper', () => { - const manager = new ModelsManager( - 'appdir', - {} as Webview, - { - getModels(): ModelInfo[] { - return []; - }, - } as CatalogManager, - telemetryLogger, - ); - test('returning model path if model has been downloaded', async () => { - vi.spyOn(manager, 'doDownloadModel').mockImplementation( - ( - _modelId: string, - _url: string, - _taskUtil: RecipeStatusUtils, - callback: (message: DownloadModelResult) => void, - _destFileName?: string, - ) => { - callback({ - successful: true, - path: 'path', - }); - }, - ); - setTaskStateMock.mockReturnThis(); - const result = await manager.doDownloadModelWrapper('id', 'url', taskUtils); - expect(result).toBe('path'); - }); - test('rejecting with error message if model has NOT been downloaded', async () => { - vi.spyOn(manager, 'doDownloadModel').mockImplementation( - ( - _modelId: string, - _url: string, - _taskUtil: RecipeStatusUtils, - callback: (message: DownloadModelResult) => void, - _destFileName?: string, - ) => { - callback({ - successful: false, - error: 'error', - }); - }, - ); - setTaskStateMock.mockReturnThis(); - setTaskErrorMock.mockReturnThis(); - await expect(manager.doDownloadModelWrapper('id', 'url', taskUtils)).rejects.toThrowError('error'); - }); -}); diff --git a/packages/backend/src/managers/modelsManager.ts b/packages/backend/src/managers/modelsManager.ts index 9aa96d68d..a5cd7274c 100644 --- a/packages/backend/src/managers/modelsManager.ts +++ b/packages/backend/src/managers/modelsManager.ts @@ -18,15 +18,15 @@ import type { LocalModelInfo } from '@shared/src/models/ILocalModelInfo'; import fs from 'fs'; -import * as https from 'node:https'; import * as path from 'node:path'; import { type Webview, fs as apiFs } from '@podman-desktop/api'; import { MSG_NEW_MODELS_STATE } from '@shared/Messages'; import type { CatalogManager } from './catalogManager'; import type { ModelInfo } from '@shared/src/models/IModelInfo'; import * as podmanDesktopApi from '@podman-desktop/api'; -import type { RecipeStatusUtils } from '../utils/recipeStatusUtils'; -import { getDurationSecondsSince } from '../utils/utils'; +import { Downloader, type DownloadEvent, isCompletionEvent, isProgressEvent } from '../utils/downloader'; +import type { TaskRegistry } from '../registries/TaskRegistry'; +import type { Task } from '@shared/src/models/ITask'; export type DownloadModelResult = DownloadModelSuccessfulResult | DownloadModelFailureResult; @@ -50,6 +50,7 @@ export class ModelsManager { private webview: Webview, private catalogManager: CatalogManager, private telemetry: podmanDesktopApi.TelemetryLogger, + private taskRegistry: TaskRegistry, ) { this.#modelsDir = path.join(this.appUserDirectory, 'models'); this.#models = new Map(); @@ -177,134 +178,72 @@ export class ModelsManager { } } - async downloadModel(model: ModelInfo, taskUtil: RecipeStatusUtils) { - if (!this.isModelOnDisk(model.id)) { - // Download model - taskUtil.setTask({ - id: model.id, - state: 'loading', - name: `Downloading model ${model.name}`, - labels: { - 'model-pulling': model.id, - }, - }); - - const startTime = performance.now(); - try { - const result = await this.doDownloadModelWrapper(model.id, model.url, taskUtil); - const durationSeconds = getDurationSecondsSince(startTime); - this.telemetry.logUsage('model.download', { 'model.id': model.id, durationSeconds }); - return result; - } catch (e) { - console.error(e); - taskUtil.setTask({ - id: model.id, - state: 'error', - name: `Downloading model ${model.name}`, - labels: { - 'model-pulling': model.id, - }, - }); - const durationSeconds = getDurationSecondsSince(startTime); - this.telemetry.logError('model.download', { - 'model.id': model.id, - message: 'error downloading model', - error: e, - durationSeconds, - }); - throw e; - } - } else { - taskUtil.setTask({ - id: model.id, - state: 'success', - name: `Model ${model.name} already present on disk`, - labels: { - 'model-pulling': model.id, - }, - }); + async downloadModel(model: ModelInfo): Promise { + const task: Task = this.taskRegistry.get(model.id) || { + id: model.id, + state: 'loading', + name: `Downloading model ${model.name}`, + labels: { + 'model-pulling': model.id, + }, + }; + + // Check if the model is already on disk. + if (this.isModelOnDisk(model.id)) { + task.state = 'success'; + task.name = `Model ${model.name} already present on disk`; + this.taskRegistry.set(task); // update task + + // return model path return this.getLocalModelPath(model.id); } - } - doDownloadModelWrapper( - modelId: string, - url: string, - taskUtil: RecipeStatusUtils, - destFileName?: string, - ): Promise { - return new Promise((resolve, reject) => { - const downloadCallback = (result: DownloadModelResult) => { - if (result.successful === true) { - taskUtil.setTaskState(modelId, 'success'); - resolve(result.path); - } else if (result.successful === false) { - taskUtil.setTaskError(modelId, result.error); - reject(result.error); - } - }; - - this.doDownloadModel(modelId, url, taskUtil, downloadCallback, destFileName); - }); - } + // update task to loading state + this.taskRegistry.set(task); - doDownloadModel( - modelId: string, - url: string, - taskUtil: RecipeStatusUtils, - callback: (message: DownloadModelResult) => void, - destFileName?: string, - ) { - const destDir = path.join(this.appUserDirectory, 'models', modelId); + // Ensure path to model directory exist + const destDir = path.join(this.appUserDirectory, 'models', model.id); if (!fs.existsSync(destDir)) { fs.mkdirSync(destDir, { recursive: true }); } - if (!destFileName) { - destFileName = path.basename(url); - } - const destFile = path.resolve(destDir, destFileName); - const file = fs.createWriteStream(destFile); - let totalFileSize = 0; - let progress = 0; - https.get(url, resp => { - if (resp.headers.location) { - this.doDownloadModel(modelId, resp.headers.location, taskUtil, callback, destFileName); - return; - } else { - if (totalFileSize === 0 && resp.headers['content-length']) { - totalFileSize = parseFloat(resp.headers['content-length']); - } - } - let previousProgressValue = -1; - resp.on('data', chunk => { - progress += chunk.length; - const progressValue = (progress * 100) / totalFileSize; + const target = path.resolve(destDir, path.basename(model.url)); + // Create a downloader + const downloader = new Downloader(model.url, target); + + // Capture downloader events + downloader.onEvent((event: DownloadEvent) => { + if (isProgressEvent(event)) { + task.state = 'loading'; + task.progress = event.value; + } else if (isCompletionEvent(event)) { + // status error or canceled + if (event.status === 'error' || event.status === 'canceled') { + task.state = 'error'; + task.progress = undefined; + task.error = event.message; + + // telemetry usage + this.telemetry.logError('model.download', { + 'model.id': model.id, + message: 'error downloading model', + error: event.message, + durationSeconds: event.duration, + }); + } else { + task.state = 'success'; + task.progress = 100; - if (progressValue === 100 || progressValue - previousProgressValue > 1) { - previousProgressValue = progressValue; - taskUtil.setTaskProgress(modelId, progressValue); + // telemetry usage + this.telemetry.logUsage('model.download', { 'model.id': model.id, durationSeconds: event.duration }); } + } - // send progress in percentage (ex. 1.2%, 2.6%, 80.1%) to frontend - //this.sendProgress(progressValue); - if (progressValue === 100) { - callback({ - successful: true, - path: destFile, - }); - } - }); - file.on('finish', () => { - file.close(); - }); - file.on('error', e => { - callback({ - successful: false, - error: e.message, - }); - }); - resp.pipe(file); + this.taskRegistry.set(task); // update task }); + + // perform download + await downloader.perform(); + return target; } } diff --git a/packages/backend/src/registries/RecipeStatusRegistry.spec.ts b/packages/backend/src/registries/RecipeStatusRegistry.spec.ts index 9ad21eee1..738cfb415 100644 --- a/packages/backend/src/registries/RecipeStatusRegistry.spec.ts +++ b/packages/backend/src/registries/RecipeStatusRegistry.spec.ts @@ -36,6 +36,7 @@ const webview = { beforeEach(() => { vi.resetAllMocks(); + mocks.postMessageMock.mockResolvedValue(undefined); }); test('recipe status registry should start without any statuses', () => { diff --git a/packages/backend/src/registries/RecipeStatusRegistry.ts b/packages/backend/src/registries/RecipeStatusRegistry.ts index e614bec27..d9e7a08ba 100644 --- a/packages/backend/src/registries/RecipeStatusRegistry.ts +++ b/packages/backend/src/registries/RecipeStatusRegistry.ts @@ -31,13 +31,11 @@ export class RecipeStatusRegistry { setStatus(recipeId: string, status: RecipeStatus) { // Update the TaskRegistry - if (status.tasks && status.tasks.length > 0) { + if (status.tasks) { status.tasks.map(task => this.taskRegistry.set(task)); } this.statuses.set(recipeId, status); - this.dispatchState().catch((err: unknown) => { - console.error('error dispatching recipe statuses', err); - }); // we don't want to wait + this.notify(); } getStatus(recipeId: string): RecipeStatus | undefined { @@ -48,10 +46,14 @@ export class RecipeStatusRegistry { return this.statuses; } - private async dispatchState() { - await this.webview.postMessage({ - id: MSG_NEW_RECIPE_STATE, - body: this.statuses, - }); + private notify() { + this.webview + .postMessage({ + id: MSG_NEW_RECIPE_STATE, + body: this.statuses, + }) + .catch((err: unknown) => { + console.error('error notifying recipe statuses', err); + }); } } diff --git a/packages/backend/src/registries/TaskRegistry.ts b/packages/backend/src/registries/TaskRegistry.ts index 31bea2a72..5e729d1b1 100644 --- a/packages/backend/src/registries/TaskRegistry.ts +++ b/packages/backend/src/registries/TaskRegistry.ts @@ -17,15 +17,41 @@ ***********************************************************************/ import type { Task } from '@shared/src/models/ITask'; +import { MSG_TASKS_UPDATE } from '@shared/Messages'; +import type { Webview } from '@podman-desktop/api'; export class TaskRegistry { private tasks: Map = new Map(); + constructor(private webview: Webview) {} + + get(id: string): Task | undefined { + if (this.tasks.has(id)) return this.tasks.get(id); + return undefined; + } + set(task: Task) { this.tasks.set(task.id, task); + this.notify(); } delete(taskId: string) { this.tasks.delete(taskId); + this.notify(); + } + + getTasks(): Task[] { + return Array.from(this.tasks.values()); + } + + private notify() { + this.webview + .postMessage({ + id: MSG_TASKS_UPDATE, + body: this.getTasks(), + }) + .catch((err: unknown) => { + console.error('error notifying tasks', err); + }); } } diff --git a/packages/backend/src/studio.ts b/packages/backend/src/studio.ts index 38d9f078e..c48e39f75 100644 --- a/packages/backend/src/studio.ts +++ b/packages/backend/src/studio.ts @@ -117,7 +117,7 @@ export class Studio { const gitManager = new GitManager(); const podmanConnection = new PodmanConnection(); - const taskRegistry = new TaskRegistry(); + const taskRegistry = new TaskRegistry(this.#panel.webview); const recipeStatusRegistry = new RecipeStatusRegistry(taskRegistry, this.#panel.webview); this.playgroundManager = new PlayGroundManager( this.#panel.webview, @@ -127,7 +127,13 @@ export class Studio { ); // Create catalog manager, responsible for loading the catalog files and watching for changes this.catalogManager = new CatalogManager(appUserDirectory, this.#panel.webview); - this.modelsManager = new ModelsManager(appUserDirectory, this.#panel.webview, this.catalogManager, this.telemetry); + this.modelsManager = new ModelsManager( + appUserDirectory, + this.#panel.webview, + this.catalogManager, + this.telemetry, + taskRegistry, + ); const applicationManager = new ApplicationManager( appUserDirectory, gitManager, diff --git a/packages/backend/src/utils/downloader.ts b/packages/backend/src/utils/downloader.ts new file mode 100644 index 000000000..3ad18bf6c --- /dev/null +++ b/packages/backend/src/utils/downloader.ts @@ -0,0 +1,130 @@ +import { getDurationSecondsSince } from './utils'; +import fs from 'fs'; +import https from 'node:https'; +import { EventEmitter, type Event } from '@podman-desktop/api'; + +export interface DownloadEvent { + status: 'error' | 'completed' | 'progress' | 'canceled'; + message?: string; +} + +export interface CompletionEvent extends DownloadEvent { + status: 'completed' | 'error' | 'canceled'; + duration: number; +} + +export interface ProgressEvent extends DownloadEvent { + status: 'progress'; + value: number; +} + +export const isCompletionEvent = (value: unknown): value is CompletionEvent => { + return ( + !!value && + typeof value === 'object' && + 'status' in value && + typeof value['status'] === 'string' && + ['canceled', 'completed', 'error'].includes(value['status']) && + 'duration' in value + ); +}; + +export const isProgressEvent = (value: unknown): value is ProgressEvent => { + return ( + !!value && typeof value === 'object' && 'status' in value && value['status'] === 'progress' && 'value' in value + ); +}; + +export class Downloader { + private readonly _onEvent = new EventEmitter(); + readonly onEvent: Event = this._onEvent.event; + + constructor( + private url: string, + private target: string, + private abortSignal?: AbortSignal, + ) {} + + async perform() { + const startTime = performance.now(); + + try { + await this.download(this.url); + const durationSeconds = getDurationSecondsSince(startTime); + this._onEvent.fire({ + status: 'completed', + message: `Duration ${durationSeconds}s.`, + duration: durationSeconds, + } as CompletionEvent); + } catch (err: unknown) { + if (!this.abortSignal?.aborted) { + this._onEvent.fire({ + status: 'error', + message: `Something went wrong: ${String(err)}.`, + }); + } else { + this._onEvent.fire({ + status: 'canceled', + message: `Request cancelled: ${String(err)}.`, + }); + } + } + } + + private download(url: string): Promise { + return new Promise((resolve, reject) => { + const callback = (result: { ok: boolean; error?: string }) => { + if (result.ok) { + resolve(); + } else { + reject(result.error); + } + }; + this.followRedirects(url, callback); + }); + } + + private followRedirects(url: string, callback: (message: { ok?: boolean; error?: string }) => void) { + const file = fs.createWriteStream(this.target); + let totalFileSize = 0; + let progress = 0; + https.get(url, { signal: this.abortSignal }, resp => { + if (resp.headers.location) { + this.followRedirects(resp.headers.location, callback); + return; + } else { + if (totalFileSize === 0 && resp.headers['content-length']) { + totalFileSize = parseFloat(resp.headers['content-length']); + } + } + + let previousProgressValue = -1; + resp.on('data', chunk => { + progress += chunk.length; + const progressValue = (progress * 100) / totalFileSize; + + if (progressValue === 100 || progressValue - previousProgressValue > 1) { + previousProgressValue = progressValue; + this._onEvent.fire({ + status: 'progress', + value: progressValue, + } as ProgressEvent); + } + + // send progress in percentage (ex. 1.2%, 2.6%, 80.1%) to frontend + if (progressValue === 100) { + callback({ ok: true }); + } + }); + file.on('finish', () => { + file.close(); + }); + file.on('error', e => { + callback({ + error: e.message, + }); + }); + resp.pipe(file); + }); + } +} diff --git a/packages/frontend/src/stores/recipe.ts b/packages/frontend/src/stores/recipe.ts index b4d7d400c..c63a84665 100644 --- a/packages/frontend/src/stores/recipe.ts +++ b/packages/frontend/src/stores/recipe.ts @@ -1,6 +1,6 @@ import type { Readable } from 'svelte/store'; import { derived, readable } from 'svelte/store'; -import { MSG_NEW_RECIPE_STATE } from '@shared/Messages'; +import { MSG_NEW_RECIPE_STATE, MSG_TASKS_UPDATE } from '@shared/Messages'; import { rpcBrowser, studioClient } from '/@/utils/client'; import type { RecipeStatus } from '@shared/src/models/IRecipeStatus'; @@ -10,12 +10,24 @@ export const recipes: Readable> = readable { set(msg); }); + + const pull = () => { + studioClient.getPullingStatuses().then(state => { + set(state); + }); + }; + // Initialize the store manually - studioClient.getPullingStatuses().then(state => { - set(state); + pull(); + + // when the tasks are updated we pull the recipe updates + const tasks = rpcBrowser.subscribe(MSG_TASKS_UPDATE, _ => { + pull(); }); + return () => { sub.unsubscribe(); + tasks.unsubscribe(); }; }, ); diff --git a/packages/shared/Messages.ts b/packages/shared/Messages.ts index ef748c6cc..92f68dea7 100644 --- a/packages/shared/Messages.ts +++ b/packages/shared/Messages.ts @@ -2,6 +2,7 @@ export const MSG_PLAYGROUNDS_STATE_UPDATE = 'playgrounds-state-update'; export const MSG_NEW_PLAYGROUND_QUERIES_STATE = 'new-playground-queries-state'; export const MSG_NEW_CATALOG_STATE = 'new-catalog-state'; export const MSG_NEW_RECIPE_STATE = 'new-recipe-state'; +export const MSG_TASKS_UPDATE = 'tasks-update'; export const MSG_NEW_MODELS_STATE = 'new-models-state'; export const MSG_ENVIRONMENTS_STATE_UPDATE = 'environments-state-update';