Skip to content

Commit

Permalink
Merge pull request #673 from golemfactory/feature/JST-598/simplify-wo…
Browse files Browse the repository at this point in the history
…rker-type

JST-598: simplify worker type
  • Loading branch information
SewerynKras authored Nov 23, 2023
2 parents 49297d9 + 9cd3aa9 commit 8487362
Show file tree
Hide file tree
Showing 11 changed files with 72 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { TaskExecutor } from "@golem-sdk/golem-js";
yagnaOptions: { apiKey: "try_golem" },
});

const result = await executor.run(async (ctx) => {
await executor.run(async (ctx) => {
const res = await ctx
.beginBatch()
.uploadFile("./worker.mjs", "/golem/input/worker.mjs")
Expand All @@ -17,6 +17,7 @@ import { TaskExecutor } from "@golem-sdk/golem-js";

res.on("data", (data) => (data.index == 2 ? console.log(data.stdout) : ""));
res.on("error", (error) => console.error(error));
res.on("close", () => executor.shutdown());
return new Promise((resolve) => res.on("close", resolve));
});
await executor.shutdown();
})();
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { TaskExecutor } from "@golem-sdk/golem-js";
yagnaOptions: { apiKey: "try_golem" },
});

const result = await executor.run(async (ctx) => {
await executor.run(async (ctx) => {
const res = await ctx
.beginBatch()
.uploadFile("./worker.mjs", "/golem/input/worker.mjs")
Expand All @@ -17,6 +17,7 @@ import { TaskExecutor } from "@golem-sdk/golem-js";

res.on("data", (result) => console.log(result));
res.on("error", (error) => console.error(error));
res.on("close", () => executor.shutdown());
return new Promise((resolve) => res.on("close", resolve));
});
await executor.shutdown();
})();
39 changes: 15 additions & 24 deletions src/executor/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { AgreementPoolService } from "../agreement";
import { Task, TaskQueue, TaskService, Worker, TaskOptions } from "../task";
import { PaymentService, PaymentOptions } from "../payment";
import { NetworkService } from "../network";
import { Result } from "../activity";
import { sleep, Logger, LogLevel, runtimeContextChecker, Yagna } from "../utils";
import { StorageProvider, GftpStorageProvider, NullStorageProvider, WebSocketBrowserStorageProvider } from "../storage";
import { ExecutorConfig } from "./config";
Expand Down Expand Up @@ -116,8 +115,8 @@ export class TaskExecutor {
private paymentService: PaymentService;
private networkService?: NetworkService;
private statsService: StatsService;
private activityReadySetupFunctions: Worker[] = [];
private taskQueue: TaskQueue<Task<unknown, unknown>>;
private activityReadySetupFunctions: Worker<unknown>[] = [];
private taskQueue: TaskQueue;
private storageProvider?: StorageProvider;
private logger?: Logger;
private lastTaskIndex = 0;
Expand Down Expand Up @@ -188,7 +187,7 @@ export class TaskExecutor {
this.logger = this.options.logger;
this.yagna = new Yagna(this.configOptions.yagnaOptions);
const yagnaApi = this.yagna.getApi();
this.taskQueue = new TaskQueue<Task<unknown, unknown>>();
this.taskQueue = new TaskQueue();
this.agreementPoolService = new AgreementPoolService(yagnaApi, this.options);
this.paymentService = new PaymentService(yagnaApi, this.options);
this.marketService = new MarketService(this.agreementPoolService, yagnaApi, this.options);
Expand Down Expand Up @@ -352,7 +351,7 @@ export class TaskExecutor {
* });
* ```
*/
beforeEach(worker: Worker) {
beforeEach(worker: Worker<unknown>) {
this.activityReadySetupFunctions = [worker];
}

Expand All @@ -377,7 +376,7 @@ export class TaskExecutor {
* });
* ```
*/
onActivityReady(worker: Worker) {
onActivityReady(worker: Worker<unknown>) {
this.activityReadySetupFunctions.push(worker);
}

Expand All @@ -392,11 +391,8 @@ export class TaskExecutor {
* await executor.run(async (ctx) => console.log((await ctx.run("echo 'Hello World'")).stdout));
* ```
*/
async run<OutputType = Result>(
worker: Worker<undefined, OutputType>,
options?: TaskOptions,
): Promise<OutputType | undefined> {
return this.executeTask<undefined, OutputType>(worker, undefined, options);
async run<OutputType>(worker: Worker<OutputType>, options?: TaskOptions): Promise<OutputType> {
return this.executeTask<OutputType>(worker, options);
}

private async createPackage(
Expand All @@ -414,24 +410,21 @@ export class TaskExecutor {
return packageInstance;
}

private async executeTask<InputType, OutputType>(
worker: Worker<InputType, OutputType>,
data?: InputType,
options?: TaskOptions,
): Promise<OutputType | undefined> {
const task = new Task<InputType, OutputType>((++this.lastTaskIndex).toString(), worker, data, {
private async executeTask<OutputType>(worker: Worker<OutputType>, options?: TaskOptions): Promise<OutputType> {
const task = new Task((++this.lastTaskIndex).toString(), worker, {
maxRetries: options?.maxRetries ?? this.options.maxTaskRetries,
timeout: options?.timeout ?? this.options.taskTimeout,
activityReadySetupFunctions: this.activityReadySetupFunctions,
});
this.taskQueue.addToEnd(task as Task<unknown, unknown>);
this.taskQueue.addToEnd(task);
while (this.isRunning) {
if (task.isFinished()) {
if (task.isRejected()) throw task.getError();
return task.getResults();
return task.getResults() as OutputType;
}
await sleep(2000, true);
}
throw new Error("Task executor has been stopped");
}

/**
Expand All @@ -453,22 +446,20 @@ export class TaskExecutor {
* const error = await job.fetchError();
* ```
*/
public async createJob<InputType = unknown, OutputType = unknown>(
worker: Worker<InputType, OutputType>,
): Promise<Job<OutputType>> {
public async createJob<OutputType>(worker: Worker<OutputType>): Promise<Job<OutputType>> {
const jobId = v4();
const job = new Job<OutputType>(jobId, this.options.jobStorage);
await job.saveInitialState();

const task = new Task(jobId, worker, undefined, {
const task = new Task(jobId, worker, {
maxRetries: this.options.maxTaskRetries,
timeout: this.options.taskTimeout,
activityReadySetupFunctions: this.activityReadySetupFunctions,
});
task.onStateChange((taskState) => {
job.saveState(taskState, task.getResults(), task.getError());
});
this.taskQueue.addToEnd(task as Task<unknown, unknown>);
this.taskQueue.addToEnd(task);

return job;
}
Expand Down
6 changes: 3 additions & 3 deletions src/golem_network/golem_network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export interface GolemNetworkConfig {
/**
* Function that will be run before each job. You can use it to set up the environment for your job. For example, you can upload a file to the provider.
*/
beforeEachJob?: Worker<unknown, unknown>;
beforeEachJob?: Worker<unknown>;
/**
* Job storage. By default Golem Network uses a simple in-memory storage for job statuses and results. In a real application you should use some persistent storage (e.g. a database). See {@link JobStorage} for more information.
*/
Expand Down Expand Up @@ -98,7 +98,7 @@ export class GolemNetwork {
* console.log(status);
* ```
*/
public async createJob<Output = unknown>(worker: Worker<unknown, Output>) {
public async createJob<Output>(worker: Worker<Output>) {
return this.executor.createJob(worker);
}

Expand All @@ -113,7 +113,7 @@ export class GolemNetwork {
* @param worker Worker function to run
* @returns Worker function result
*/
public async runTask<Output = unknown>(worker: Worker<undefined, Output>) {
public async runTask<Output>(worker: Worker<Output>) {
return this.executor.run<Output>(worker);
}

Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ export { Logger, LogLevel, jsonLogger, nullLogger, consoleLogger, pinoLogger, de
export { Yagna } from "./utils/yagna/yagna";
export { Job, JobStorage, JobState } from "./job";
export { GolemNetwork, GolemNetworkConfig } from "./golem_network";
export { Worker, WorkContext } from "./task";
14 changes: 8 additions & 6 deletions src/task/queue.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { Task } from "./task";

/**
* @internal
*/
Expand All @@ -8,15 +10,15 @@ export interface QueueableTask {
/**
* @internal
*/
export class TaskQueue<Task extends QueueableTask> {
protected itemsStack: Array<Task> = [];
export class TaskQueue<T extends QueueableTask = Task> {
protected itemsStack: Array<T> = [];

addToEnd(task: Task) {
addToEnd(task: T) {
this._checkIfTaskIsEligibleForAdd(task);
this.itemsStack.push(task);
}

addToBegin(task: Task) {
addToBegin(task: T) {
this._checkIfTaskIsEligibleForAdd(task);
this.itemsStack.unshift(task);
}
Expand All @@ -25,11 +27,11 @@ export class TaskQueue<Task extends QueueableTask> {
return this.itemsStack.length;
}

get(): Task | undefined {
get(): T | undefined {
return this.itemsStack.shift();
}

private _checkIfTaskIsEligibleForAdd(task: Task) {
private _checkIfTaskIsEligibleForAdd(task: T) {
if (!task.isQueueable()) throw new Error("You cannot add a task that is not in the correct state");
}
}
19 changes: 5 additions & 14 deletions src/task/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export class TaskService {

constructor(
private yagnaApi: YagnaApi,
private tasksQueue: TaskQueue<Task<unknown, unknown>>,
private tasksQueue: TaskQueue,
private agreementPoolService: AgreementPoolService,
private paymentService: PaymentService,
private networkService?: NetworkService,
Expand Down Expand Up @@ -72,7 +72,7 @@ export class TaskService {

private async startTask(task: Task) {
task.start();
this.logger?.debug(`Starting task. ID: ${task.id}, Data: ${task.getData()}`);
this.logger?.debug(`Starting task. ID: ${task.id}`);
++this.activeTasksCount;

const agreement = await this.agreementPoolService.getAgreement();
Expand All @@ -89,17 +89,12 @@ export class TaskService {
}),
);

this.logger?.info(
`Task ${task.id} sent to provider ${agreement.provider.name}.${
task.getData() ? " Data: " + task.getData() : ""
}`,
);
this.logger?.info(`Task ${task.id} sent to provider ${agreement.provider.name}.`);

this.paymentService.acceptDebitNotes(agreement.id);
this.paymentService.acceptPayments(agreement);
const activityReadySetupFunctions = task.getActivityReadySetupFunctions();
const worker = task.getWorker();
const data = task.getData();
const networkNode = await this.networkService?.addNode(agreement.provider.id);
const ctx = new WorkContext(activity, {
activityReadySetupFunctions: this.activitySetupDone.has(activity.id) ? [] : activityReadySetupFunctions,
Expand All @@ -115,14 +110,10 @@ export class TaskService {
this.activitySetupDone.add(activity.id);
this.logger?.debug(`Activity setup completed in activity ${activity.id}`);
}
const results = await worker(ctx, data);
const results = await worker(ctx);
task.stop(results);
this.options.eventTarget?.dispatchEvent(new Events.TaskFinished({ id: task.id }));
this.logger?.info(
`Task ${task.id} computed by provider ${agreement.provider.name}.${
task.getData() ? " Data: " + task.getData() : ""
}`,
);
this.logger?.info(`Task ${task.id} computed by provider ${agreement.provider.name}.`);
} catch (error) {
task.stop(undefined, error);
const reason = error?.response?.data?.message || error.message || error.toString();
Expand Down
16 changes: 6 additions & 10 deletions src/task/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export type TaskOptions = {
/** timeout in ms for task execution, including retries, default = 300_000 (5min) */
timeout?: number;
/** array of setup functions to run on each activity */
activityReadySetupFunctions?: Worker[];
activityReadySetupFunctions?: Worker<unknown>[];
};

const DEFAULTS = {
Expand All @@ -28,7 +28,7 @@ const DEFAULTS = {
*
* @description Represents one computation unit that will be run on the one provider machine (e.g. rendering of one frame of an animation).
*/
export class Task<InputType = unknown, OutputType = unknown> implements QueueableTask {
export class Task<OutputType = unknown> implements QueueableTask {
private state = TaskState.New;
private results?: OutputType;
private error?: Error;
Expand All @@ -37,12 +37,11 @@ export class Task<InputType = unknown, OutputType = unknown> implements Queueabl
private timeoutId?: NodeJS.Timeout;
private readonly timeout: number;
private readonly maxRetries: number;
private readonly activityReadySetupFunctions: Worker[];
private readonly activityReadySetupFunctions: Worker<unknown>[];

constructor(
public readonly id: string,
private worker: Worker<InputType, OutputType>,
private data?: InputType,
private worker: Worker<OutputType>,
options?: TaskOptions,
) {
this.timeout = options?.timeout ?? DEFAULTS.TIMEOUT;
Expand Down Expand Up @@ -99,13 +98,10 @@ export class Task<InputType = unknown, OutputType = unknown> implements Queueabl
getResults(): OutputType | undefined {
return this.results;
}
getData(): InputType | undefined {
return this.data;
}
getWorker(): Worker<InputType> {
getWorker(): Worker<OutputType> {
return this.worker;
}
getActivityReadySetupFunctions(): Worker[] {
getActivityReadySetupFunctions(): Worker<unknown>[] {
return this.activityReadySetupFunctions;
}
getRetriesCount(): number {
Expand Down
9 changes: 3 additions & 6 deletions src/task/work.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@ import { Batch } from "./batch";
import { NetworkNode } from "../network";
import { RemoteProcess } from "./process";

export type Worker<InputType = unknown, OutputType = unknown> = (
ctx: WorkContext,
data?: InputType,
) => Promise<OutputType>;
export type Worker<OutputType> = (ctx: WorkContext) => Promise<OutputType>;

const DEFAULTS = {
activityPreparingTimeout: 300_000,
Expand All @@ -35,7 +32,7 @@ export interface WorkOptions {
storageProvider?: StorageProvider;
networkNode?: NetworkNode;
logger?: Logger;
activityReadySetupFunctions?: Worker[];
activityReadySetupFunctions?: Worker<unknown>[];
}

export interface CommandOptions {
Expand Down Expand Up @@ -114,7 +111,7 @@ export class WorkContext {
return;
}
for (const setupFunction of this.options.activityReadySetupFunctions) {
await setupFunction(this, undefined);
await setupFunction(this);
}
}

Expand Down
Loading

0 comments on commit 8487362

Please sign in to comment.