diff --git a/examples/docs-examples/examples/composing-tasks/batch-endstream-chunks.mjs b/examples/docs-examples/examples/composing-tasks/batch-endstream-chunks.mjs index fc6b980d8..9d70ea397 100644 --- a/examples/docs-examples/examples/composing-tasks/batch-endstream-chunks.mjs +++ b/examples/docs-examples/examples/composing-tasks/batch-endstream-chunks.mjs @@ -6,7 +6,7 @@ import { TaskExecutor } from "@golem-sdk/golem-js"; yagnaOptions: { apiKey: "try_golem" }, }); - const result = await executor.run(async (ctx) => { + await executor.run(async (ctx) => { const res = await ctx .beginBatch() .uploadFile("./worker.mjs", "/golem/input/worker.mjs") @@ -17,6 +17,7 @@ import { TaskExecutor } from "@golem-sdk/golem-js"; res.on("data", (data) => (data.index == 2 ? console.log(data.stdout) : "")); res.on("error", (error) => console.error(error)); - res.on("close", () => executor.shutdown()); + return new Promise((resolve) => res.on("close", resolve)); }); + await executor.shutdown(); })(); diff --git a/examples/docs-examples/examples/working-with-results/multi-command-endstream.mjs b/examples/docs-examples/examples/working-with-results/multi-command-endstream.mjs index f68600b3d..3d187092a 100644 --- a/examples/docs-examples/examples/working-with-results/multi-command-endstream.mjs +++ b/examples/docs-examples/examples/working-with-results/multi-command-endstream.mjs @@ -6,7 +6,7 @@ import { TaskExecutor } from "@golem-sdk/golem-js"; yagnaOptions: { apiKey: "try_golem" }, }); - const result = await executor.run(async (ctx) => { + await executor.run(async (ctx) => { const res = await ctx .beginBatch() .uploadFile("./worker.mjs", "/golem/input/worker.mjs") @@ -17,6 +17,7 @@ import { TaskExecutor } from "@golem-sdk/golem-js"; res.on("data", (result) => console.log(result)); res.on("error", (error) => console.error(error)); - res.on("close", () => executor.shutdown()); + return new Promise((resolve) => res.on("close", resolve)); }); + await executor.shutdown(); })(); diff --git a/src/executor/executor.ts b/src/executor/executor.ts index 452f796db..8ea172d6b 100644 --- a/src/executor/executor.ts +++ b/src/executor/executor.ts @@ -4,7 +4,6 @@ import { AgreementPoolService } from "../agreement"; import { Task, TaskQueue, TaskService, Worker, TaskOptions } from "../task"; import { PaymentService, PaymentOptions } from "../payment"; import { NetworkService } from "../network"; -import { Result } from "../activity"; import { sleep, Logger, LogLevel, runtimeContextChecker, Yagna } from "../utils"; import { StorageProvider, GftpStorageProvider, NullStorageProvider, WebSocketBrowserStorageProvider } from "../storage"; import { ExecutorConfig } from "./config"; @@ -116,8 +115,8 @@ export class TaskExecutor { private paymentService: PaymentService; private networkService?: NetworkService; private statsService: StatsService; - private activityReadySetupFunctions: Worker[] = []; - private taskQueue: TaskQueue>; + private activityReadySetupFunctions: Worker[] = []; + private taskQueue: TaskQueue; private storageProvider?: StorageProvider; private logger?: Logger; private lastTaskIndex = 0; @@ -188,7 +187,7 @@ export class TaskExecutor { this.logger = this.options.logger; this.yagna = new Yagna(this.configOptions.yagnaOptions); const yagnaApi = this.yagna.getApi(); - this.taskQueue = new TaskQueue>(); + this.taskQueue = new TaskQueue(); this.agreementPoolService = new AgreementPoolService(yagnaApi, this.options); this.paymentService = new PaymentService(yagnaApi, this.options); this.marketService = new MarketService(this.agreementPoolService, yagnaApi, this.options); @@ -352,7 +351,7 @@ export class TaskExecutor { * }); * ``` */ - beforeEach(worker: Worker) { + beforeEach(worker: Worker) { this.activityReadySetupFunctions = [worker]; } @@ -377,7 +376,7 @@ export class TaskExecutor { * }); * ``` */ - onActivityReady(worker: Worker) { + onActivityReady(worker: Worker) { this.activityReadySetupFunctions.push(worker); } @@ -392,11 +391,8 @@ export class TaskExecutor { * await executor.run(async (ctx) => console.log((await ctx.run("echo 'Hello World'")).stdout)); * ``` */ - async run( - worker: Worker, - options?: TaskOptions, - ): Promise { - return this.executeTask(worker, undefined, options); + async run(worker: Worker, options?: TaskOptions): Promise { + return this.executeTask(worker, options); } private async createPackage( @@ -414,24 +410,21 @@ export class TaskExecutor { return packageInstance; } - private async executeTask( - worker: Worker, - data?: InputType, - options?: TaskOptions, - ): Promise { - const task = new Task((++this.lastTaskIndex).toString(), worker, data, { + private async executeTask(worker: Worker, options?: TaskOptions): Promise { + const task = new Task((++this.lastTaskIndex).toString(), worker, { maxRetries: options?.maxRetries ?? this.options.maxTaskRetries, timeout: options?.timeout ?? this.options.taskTimeout, activityReadySetupFunctions: this.activityReadySetupFunctions, }); - this.taskQueue.addToEnd(task as Task); + this.taskQueue.addToEnd(task); while (this.isRunning) { if (task.isFinished()) { if (task.isRejected()) throw task.getError(); - return task.getResults(); + return task.getResults() as OutputType; } await sleep(2000, true); } + throw new Error("Task executor has been stopped"); } /** @@ -453,14 +446,12 @@ export class TaskExecutor { * const error = await job.fetchError(); * ``` */ - public async createJob( - worker: Worker, - ): Promise> { + public async createJob(worker: Worker): Promise> { const jobId = v4(); const job = new Job(jobId, this.options.jobStorage); await job.saveInitialState(); - const task = new Task(jobId, worker, undefined, { + const task = new Task(jobId, worker, { maxRetries: this.options.maxTaskRetries, timeout: this.options.taskTimeout, activityReadySetupFunctions: this.activityReadySetupFunctions, @@ -468,7 +459,7 @@ export class TaskExecutor { task.onStateChange((taskState) => { job.saveState(taskState, task.getResults(), task.getError()); }); - this.taskQueue.addToEnd(task as Task); + this.taskQueue.addToEnd(task); return job; } diff --git a/src/golem_network/golem_network.ts b/src/golem_network/golem_network.ts index d68386712..3a11cd229 100644 --- a/src/golem_network/golem_network.ts +++ b/src/golem_network/golem_network.ts @@ -28,7 +28,7 @@ export interface GolemNetworkConfig { /** * Function that will be run before each job. You can use it to set up the environment for your job. For example, you can upload a file to the provider. */ - beforeEachJob?: Worker; + beforeEachJob?: Worker; /** * Job storage. By default Golem Network uses a simple in-memory storage for job statuses and results. In a real application you should use some persistent storage (e.g. a database). See {@link JobStorage} for more information. */ @@ -98,7 +98,7 @@ export class GolemNetwork { * console.log(status); * ``` */ - public async createJob(worker: Worker) { + public async createJob(worker: Worker) { return this.executor.createJob(worker); } @@ -113,7 +113,7 @@ export class GolemNetwork { * @param worker Worker function to run * @returns Worker function result */ - public async runTask(worker: Worker) { + public async runTask(worker: Worker) { return this.executor.run(worker); } diff --git a/src/index.ts b/src/index.ts index ea0ebfd89..80423bc8c 100755 --- a/src/index.ts +++ b/src/index.ts @@ -16,3 +16,4 @@ export { Logger, LogLevel, jsonLogger, nullLogger, consoleLogger, pinoLogger, de export { Yagna } from "./utils/yagna/yagna"; export { Job, JobStorage, JobState } from "./job"; export { GolemNetwork, GolemNetworkConfig } from "./golem_network"; +export { Worker, WorkContext } from "./task"; diff --git a/src/task/queue.ts b/src/task/queue.ts index f4c1c4892..0a28059ba 100644 --- a/src/task/queue.ts +++ b/src/task/queue.ts @@ -1,3 +1,5 @@ +import { Task } from "./task"; + /** * @internal */ @@ -8,15 +10,15 @@ export interface QueueableTask { /** * @internal */ -export class TaskQueue { - protected itemsStack: Array = []; +export class TaskQueue { + protected itemsStack: Array = []; - addToEnd(task: Task) { + addToEnd(task: T) { this._checkIfTaskIsEligibleForAdd(task); this.itemsStack.push(task); } - addToBegin(task: Task) { + addToBegin(task: T) { this._checkIfTaskIsEligibleForAdd(task); this.itemsStack.unshift(task); } @@ -25,11 +27,11 @@ export class TaskQueue { return this.itemsStack.length; } - get(): Task | undefined { + get(): T | undefined { return this.itemsStack.shift(); } - private _checkIfTaskIsEligibleForAdd(task: Task) { + private _checkIfTaskIsEligibleForAdd(task: T) { if (!task.isQueueable()) throw new Error("You cannot add a task that is not in the correct state"); } } diff --git a/src/task/service.ts b/src/task/service.ts index a522b369a..41d42c951 100644 --- a/src/task/service.ts +++ b/src/task/service.ts @@ -34,7 +34,7 @@ export class TaskService { constructor( private yagnaApi: YagnaApi, - private tasksQueue: TaskQueue>, + private tasksQueue: TaskQueue, private agreementPoolService: AgreementPoolService, private paymentService: PaymentService, private networkService?: NetworkService, @@ -72,7 +72,7 @@ export class TaskService { private async startTask(task: Task) { task.start(); - this.logger?.debug(`Starting task. ID: ${task.id}, Data: ${task.getData()}`); + this.logger?.debug(`Starting task. ID: ${task.id}`); ++this.activeTasksCount; const agreement = await this.agreementPoolService.getAgreement(); @@ -89,17 +89,12 @@ export class TaskService { }), ); - this.logger?.info( - `Task ${task.id} sent to provider ${agreement.provider.name}.${ - task.getData() ? " Data: " + task.getData() : "" - }`, - ); + this.logger?.info(`Task ${task.id} sent to provider ${agreement.provider.name}.`); this.paymentService.acceptDebitNotes(agreement.id); this.paymentService.acceptPayments(agreement); const activityReadySetupFunctions = task.getActivityReadySetupFunctions(); const worker = task.getWorker(); - const data = task.getData(); const networkNode = await this.networkService?.addNode(agreement.provider.id); const ctx = new WorkContext(activity, { activityReadySetupFunctions: this.activitySetupDone.has(activity.id) ? [] : activityReadySetupFunctions, @@ -115,14 +110,10 @@ export class TaskService { this.activitySetupDone.add(activity.id); this.logger?.debug(`Activity setup completed in activity ${activity.id}`); } - const results = await worker(ctx, data); + const results = await worker(ctx); task.stop(results); this.options.eventTarget?.dispatchEvent(new Events.TaskFinished({ id: task.id })); - this.logger?.info( - `Task ${task.id} computed by provider ${agreement.provider.name}.${ - task.getData() ? " Data: " + task.getData() : "" - }`, - ); + this.logger?.info(`Task ${task.id} computed by provider ${agreement.provider.name}.`); } catch (error) { task.stop(undefined, error); const reason = error?.response?.data?.message || error.message || error.toString(); diff --git a/src/task/task.ts b/src/task/task.ts index b20d61444..0aca9c739 100644 --- a/src/task/task.ts +++ b/src/task/task.ts @@ -15,7 +15,7 @@ export type TaskOptions = { /** timeout in ms for task execution, including retries, default = 300_000 (5min) */ timeout?: number; /** array of setup functions to run on each activity */ - activityReadySetupFunctions?: Worker[]; + activityReadySetupFunctions?: Worker[]; }; const DEFAULTS = { @@ -28,7 +28,7 @@ const DEFAULTS = { * * @description Represents one computation unit that will be run on the one provider machine (e.g. rendering of one frame of an animation). */ -export class Task implements QueueableTask { +export class Task implements QueueableTask { private state = TaskState.New; private results?: OutputType; private error?: Error; @@ -37,12 +37,11 @@ export class Task implements Queueabl private timeoutId?: NodeJS.Timeout; private readonly timeout: number; private readonly maxRetries: number; - private readonly activityReadySetupFunctions: Worker[]; + private readonly activityReadySetupFunctions: Worker[]; constructor( public readonly id: string, - private worker: Worker, - private data?: InputType, + private worker: Worker, options?: TaskOptions, ) { this.timeout = options?.timeout ?? DEFAULTS.TIMEOUT; @@ -99,13 +98,10 @@ export class Task implements Queueabl getResults(): OutputType | undefined { return this.results; } - getData(): InputType | undefined { - return this.data; - } - getWorker(): Worker { + getWorker(): Worker { return this.worker; } - getActivityReadySetupFunctions(): Worker[] { + getActivityReadySetupFunctions(): Worker[] { return this.activityReadySetupFunctions; } getRetriesCount(): number { diff --git a/src/task/work.ts b/src/task/work.ts index 8129ae8e4..9d8ce514a 100644 --- a/src/task/work.ts +++ b/src/task/work.ts @@ -18,10 +18,7 @@ import { Batch } from "./batch"; import { NetworkNode } from "../network"; import { RemoteProcess } from "./process"; -export type Worker = ( - ctx: WorkContext, - data?: InputType, -) => Promise; +export type Worker = (ctx: WorkContext) => Promise; const DEFAULTS = { activityPreparingTimeout: 300_000, @@ -35,7 +32,7 @@ export interface WorkOptions { storageProvider?: StorageProvider; networkNode?: NetworkNode; logger?: Logger; - activityReadySetupFunctions?: Worker[]; + activityReadySetupFunctions?: Worker[]; } export interface CommandOptions { @@ -114,7 +111,7 @@ export class WorkContext { return; } for (const setupFunction of this.options.activityReadySetupFunctions) { - await setupFunction(this, undefined); + await setupFunction(this); } } diff --git a/tests/unit/tasks_service.test.ts b/tests/unit/tasks_service.test.ts index 0092b1245..cb8bf54f2 100644 --- a/tests/unit/tasks_service.test.ts +++ b/tests/unit/tasks_service.test.ts @@ -1,19 +1,19 @@ import * as activityMock from "../mock/rest/activity"; -import { Task, TaskQueue, TaskService, Worker } from "../../src/task"; +import { Task, TaskQueue, TaskService, WorkContext, Worker } from "../../src/task"; import { agreementPoolServiceMock, paymentServiceMock, networkServiceMock, LoggerMock, YagnaMock } from "../mock"; -import { Result } from "../../src"; -let queue; + +let queue: TaskQueue; const logger = new LoggerMock(); describe("Task Service", () => { beforeEach(() => { logger.clear(); activityMock.clear(); - queue = new TaskQueue>(); + queue = new TaskQueue(); }); it("should process new task in queue", async () => { - const worker: Worker = async (ctx) => ctx.run("some_shell_command"); - const task = new Task("1", worker); + const worker = async (ctx: WorkContext) => ctx.run("some_shell_command"); + const task = new Task("1", worker); queue.addToEnd(task); activityMock.setExpectedExeResults([{ stdout: "some_shell_results" }]); const service = new TaskService( @@ -38,7 +38,7 @@ describe("Task Service", () => { }); it("process only allowed number of tasks simultaneously", async () => { - const worker: Worker = async (ctx) => ctx.run("some_shell_command"); + const worker = async (ctx: WorkContext) => ctx.run("some_shell_command"); const task1 = new Task("1", worker); const task2 = new Task("2", worker); const task3 = new Task("3", worker); @@ -65,7 +65,7 @@ describe("Task Service", () => { }); it("should retry task if it failed", async () => { - const worker: Worker = async (ctx) => ctx.run("some_shell_command"); + const worker = async (ctx: WorkContext) => ctx.run("some_shell_command"); const task = new Task("1", worker); queue.addToEnd(task); activityMock.setExpectedErrors([new Error(), new Error(), new Error(), new Error(), new Error()]); @@ -87,11 +87,11 @@ describe("Task Service", () => { }); it("should reject task by user", async () => { - const worker: Worker = async (ctx) => { + const worker = async (ctx: WorkContext) => { const result = await ctx.run("some_shell_command"); if (result.stdout === "invalid_value") ctx.rejectResult("Invalid value computed by provider"); }; - const task = new Task("1", worker, undefined, { maxRetries: 2 }); + const task = new Task("1", worker, { maxRetries: 2 }); queue.addToEnd(task); activityMock.setExpectedExeResults([{ result: "Ok", stdout: "invalid_value" }]); const service = new TaskService( @@ -116,8 +116,8 @@ describe("Task Service", () => { }); it("should reject task if it failed max attempts", async () => { - const worker: Worker = async (ctx) => ctx.run("some_shell_command"); - const task = new Task("1", worker, undefined, { maxRetries: 1 }); + const worker = async (ctx: WorkContext) => ctx.run("some_shell_command"); + const task = new Task("1", worker, { maxRetries: 1 }); queue.addToEnd(task); activityMock.setExpectedErrors(new Array(20).fill(new Error())); const service = new TaskService( @@ -139,11 +139,11 @@ describe("Task Service", () => { }); it("should run setup functions on each activity", async () => { - const setupFunctions: Worker[] = [async (ctx) => ctx.run("init_shell_command")]; - const worker: Worker = async (ctx) => ctx.run("some_shell_command"); - const task1 = new Task("1", worker, null, { activityReadySetupFunctions: setupFunctions }); - const task2 = new Task("2", worker, null, { activityReadySetupFunctions: setupFunctions }); - const task3 = new Task("3", worker, null, { activityReadySetupFunctions: setupFunctions }); + const setupFunctions = [async (ctx: WorkContext) => ctx.run("init_shell_command")]; + const worker = async (ctx: WorkContext) => ctx.run("some_shell_command"); + const task1 = new Task("1", worker, { activityReadySetupFunctions: setupFunctions }); + const task2 = new Task("2", worker, { activityReadySetupFunctions: setupFunctions }); + const task3 = new Task("3", worker, { activityReadySetupFunctions: setupFunctions }); queue.addToEnd(task1); queue.addToEnd(task2); queue.addToEnd(task3); diff --git a/tests/unit/work.test.ts b/tests/unit/work.test.ts index 97a6cf0f8..373dedc5b 100644 --- a/tests/unit/work.test.ts +++ b/tests/unit/work.test.ts @@ -17,7 +17,7 @@ describe("Work Context", () => { describe("Executing", () => { it("should execute run command", async () => { const activity = await Activity.create("test_agreement_id", yagnaApi); - const worker: Worker = async (ctx) => ctx.run("some_shell_command"); + const worker = async (ctx: WorkContext) => ctx.run("some_shell_command"); const ctx = new WorkContext(activity, { logger, activityStateCheckingInterval: 10 }); await ctx.before(); const results = await worker(ctx); @@ -26,7 +26,7 @@ describe("Work Context", () => { it("should execute upload file command", async () => { const activity = await Activity.create("test_agreement_id", yagnaApi); - const worker: Worker = async (ctx) => ctx.uploadFile("./file.txt", "/golem/file.txt"); + const worker = async (ctx: WorkContext) => ctx.uploadFile("./file.txt", "/golem/file.txt"); const ctx = new WorkContext(activity, { logger, activityStateCheckingInterval: 10, @@ -40,7 +40,7 @@ describe("Work Context", () => { it("should execute upload json command", async () => { const activity = await Activity.create("test_agreement_id", yagnaApi); - const worker: Worker = async (ctx) => ctx.uploadJson({ test: true }, "/golem/file.txt"); + const worker = async (ctx: WorkContext) => ctx.uploadJson({ test: true }, "/golem/file.txt"); const ctx = new WorkContext(activity, { logger, activityStateCheckingInterval: 10, @@ -54,7 +54,7 @@ describe("Work Context", () => { it("should execute download file command", async () => { const activity = await Activity.create("test_agreement_id", yagnaApi); - const worker: Worker = async (ctx) => ctx.downloadFile("/golem/file.txt", "./file.txt"); + const worker = async (ctx: WorkContext) => ctx.downloadFile("/golem/file.txt", "./file.txt"); const ctx = new WorkContext(activity, { logger, activityStateCheckingInterval: 10, @@ -69,7 +69,7 @@ describe("Work Context", () => { describe("Batch", () => { it("should execute batch as promise", async () => { const activity = await Activity.create("test_agreement_id", yagnaApi); - const worker: Worker = async (ctx) => { + const worker = async (ctx: WorkContext) => { return ctx .beginBatch() .run("some_shell_command") @@ -99,7 +99,7 @@ describe("Work Context", () => { it("should execute batch as stream", async () => { const activity = await Activity.create("test_agreement_id", yagnaApi); - const worker: Worker = async (ctx) => { + const worker = async (ctx: WorkContext) => { return ctx .beginBatch() .run("some_shell_command") @@ -139,7 +139,7 @@ describe("Work Context", () => { describe("Error handling", () => { it("should return a result with error in case the command to execute is invalid", async () => { const activity = await Activity.create("test_agreement_id", yagnaApi); - const worker: Worker = async (ctx) => ctx.beginBatch().run("invalid_shell_command").end(); + const worker = async (ctx: WorkContext) => ctx.beginBatch().run("invalid_shell_command").end(); const ctx = new WorkContext(activity, { logger, activityStateCheckingInterval: 10, @@ -156,7 +156,7 @@ describe("Work Context", () => { it("should catch error while executing batch as stream with invalid command", async () => { const activity = await Activity.create("test_agreement_id", yagnaApi); - const worker: Worker = async (ctx) => ctx.beginBatch().run("invalid_shell_command").endStream(); + const worker = async (ctx: WorkContext) => ctx.beginBatch().run("invalid_shell_command").endStream(); const ctx = new WorkContext(activity, { logger, activityStateCheckingInterval: 10,