Skip to content

Commit

Permalink
Merge pull request #957 from golemfactory/feature/JST-931/activity-ev…
Browse files Browse the repository at this point in the history
…ents

Activity events
  • Loading branch information
SewerynKras authored Jun 6, 2024
2 parents 9503c49 + 94556a1 commit cf7f3dd
Show file tree
Hide file tree
Showing 16 changed files with 389 additions and 214 deletions.
179 changes: 148 additions & 31 deletions src/activity/activity.module.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/* eslint-disable @typescript-eslint/no-unused-vars */
import { EventEmitter } from "eventemitter3";
import { Agreement } from "../market/agreement";
import { Activity, IActivityApi } from "./index";
import { Activity, IActivityApi, ActivityEvents, Result } from "./index";
import { defaultLogger } from "../shared/utils";
import { GolemServices } from "../golem-network/golem-network";
import { WorkContext, WorkOptions } from "./work";

export interface ActivityEvents {}
import { ExeScriptExecutor, ExeScriptRequest, ExecutionOptions } from "./exe-script-executor";
import { Observable, catchError, tap } from "rxjs";
import { StreamingBatchEvent } from "./results";

export interface ActivityModule {
events: EventEmitter<ActivityEvents>;
Expand All @@ -23,14 +23,50 @@ export interface ActivityModule {
*
* @return The activity that was permanently terminated
*/
destroyActivity(activity: Activity, reason?: string): Promise<Activity>;
destroyActivity(activity: Activity): Promise<Activity>;

/**
* Fetches the latest state of the activity. It's recommended to use this method
* before performing any actions on the activity to make sure it's in the correct state.
* If the fetched activity's state is different from the one you have, an event will be emitted.
*/
refreshActivity(staleActivity: Activity): Promise<Activity>;

/**
* Fetches the activity by its ID from yagna. If the activity doesn't exist, an error will be thrown.
*/
findActivityById(activityId: string): Promise<Activity>;

/**
* Create a work context "within" the activity so that you can perform commands on the rented resources
*
* @return An WorkContext that's fully commissioned and the user can execute their commands
*/
createWorkContext(activity: Activity, options?: WorkOptions): Promise<WorkContext>;

/**
* Factory method for creating a script executor for the activity
*/
createScriptExecutor(activity: Activity, options?: ExecutionOptions): ExeScriptExecutor;

/**
* Execute a script on the activity.
*/
executeScript(activity: Activity, script: ExeScriptRequest): Promise<string>;

/**
* Fetch the results of a batch execution.
*/
getBatchResults(activity: Activity, batchId: string, commandIndex?: number, timeout?: number): Promise<Result[]>;

/**
* Create an observable that will emit events from the streaming batch.
*/
observeStreamingBatchEvents(
activity: Activity,
batchId: string,
commandIndex?: number,
): Observable<StreamingBatchEvent>;
}

/**
Expand Down Expand Up @@ -78,52 +114,133 @@ export class ActivityModuleImpl implements ActivityModule {

private readonly activityApi: IActivityApi;

private readonly fileServer?: IFileServer;

constructor(
private readonly services: GolemServices,
private readonly options: ActivityModuleOptions = {
fileServer: false,
},
) {
constructor(private readonly services: GolemServices) {
this.logger = services.logger;
this.activityApi = services.activityApi;
}
createScriptExecutor(activity: Activity, options?: ExecutionOptions): ExeScriptExecutor {
return new ExeScriptExecutor(activity, this, this.logger.child("executor"), options);
}

async createActivity(agreement: Agreement): Promise<Activity> {
const activity = await this.activityApi.createActivity(agreement);
async executeScript(activity: Activity, script: ExeScriptRequest): Promise<string> {
this.logger.info("Executing script on activity", { activityId: activity.id });
try {
const result = await this.activityApi.executeScript(activity, script);
this.events.emit("scriptExecuted", activity, script, result);
return result;
} catch (error) {
this.events.emit("errorExecutingScript", activity, script, error);
throw error;
}
}
async getBatchResults(
activity: Activity,
batchId: string,
commandIndex?: number | undefined,
timeout?: number | undefined,
): Promise<Result[]> {
this.logger.info("Fetching batch results", { activityId: activity.id, batchId });
try {
const results = await this.activityApi.getExecBatchResults(activity, batchId, commandIndex, timeout);
this.events.emit("batchResultsReceived", activity, batchId, results);
return results;
} catch (error) {
this.events.emit("errorGettingBatchResults", activity, batchId, error);
throw error;
}
}
observeStreamingBatchEvents(
activity: Activity,
batchId: string,
commandIndex?: number | undefined,
): Observable<StreamingBatchEvent> {
this.logger.info("Observing streaming batch events", { activityId: activity.id, batchId });
return this.activityApi.getExecBatchEvents(activity, batchId, commandIndex).pipe(
tap((event) => {
this.events.emit("batchEventsReceived", activity, batchId, event);
}),
catchError((error) => {
this.events.emit("errorGettingBatchEvents", activity, batchId, error);
throw error;
}),
);
}

this.logger.info("Created activity", {
activityId: activity.id,
async createActivity(agreement: Agreement): Promise<Activity> {
this.logger.info("Creating activity", {
agreementId: agreement.id,
provider: agreement.getProviderInfo(),
});

return activity;
try {
const activity = await this.activityApi.createActivity(agreement);
this.events.emit("activityCreated", activity);
return activity;
} catch (error) {
this.events.emit("errorCreatingActivity", error);
throw error;
}
}

async destroyActivity(activity: Activity, reason: string): Promise<Activity> {
const updated = await this.activityApi.destroyActivity(activity);
async destroyActivity(activity: Activity): Promise<Activity> {
this.logger.info("Destroying activity", {
activityId: activity.id,
agreementId: activity.agreement.id,
provider: activity.agreement.getProviderInfo(),
});
try {
const updated = await this.activityApi.destroyActivity(activity);
this.events.emit("activityDestroyed", updated);
return updated;
} catch (error) {
this.events.emit("errorDestroyingActivity", activity, error);
throw error;
}
}

this.logger.info("Destroyed activity", {
activityId: updated.id,
agreementId: updated.agreement.id,
provider: updated.agreement.getProviderInfo(),
async refreshActivity(staleActivity: Activity): Promise<Activity> {
// logging to debug level to avoid spamming the logs because this method is called frequently
this.logger.debug("Fetching latest activity state", {
activityId: staleActivity.id,
lastState: staleActivity.getState(),
});
try {
const freshActivity = await this.activityApi.getActivity(staleActivity.id);
if (freshActivity.getState() !== staleActivity.getState()) {
this.logger.debug("Activity state changed", {
activityId: staleActivity.id,
previousState: staleActivity.getState(),
newState: freshActivity.getState(),
});
this.events.emit("activityStateChanged", freshActivity, staleActivity.getState());
}
return freshActivity;
} catch (error) {
this.events.emit("errorRefreshingActivity", staleActivity, error);
throw error;
}
}

return updated;
async findActivityById(activityId: string): Promise<Activity> {
this.logger.info("Fetching activity by ID", { activityId });
return await this.activityApi.getActivity(activityId);
}

async createWorkContext(activity: Activity, options?: WorkOptions): Promise<WorkContext> {
this.logger.debug("Creating work context for activity", { activityId: activity.id });
const ctx = new WorkContext(activity, this.services.activityApi, this.services.networkApi, {
this.logger.info("Creating work context for activity", { activityId: activity.id });
const ctx = new WorkContext(activity, this, {
yagnaOptions: this.services.yagna.yagnaOptions,
logger: this.logger.child("work-context"),
...options,
});

this.logger.debug("Initializing the exe-unit for activity", { activityId: activity.id });
await ctx.before();

return ctx;
try {
await ctx.before();
this.events.emit("activityInitialized", activity);
return ctx;
} catch (error) {
this.events.emit("errorInitializingActivity", activity, error);
throw error;
}
}
}
10 changes: 0 additions & 10 deletions src/activity/activity.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
import { Logger } from "../shared/utils";
import { Agreement, ProviderInfo } from "../market/agreement";
import { ExecutionOptions, ExeScriptExecutor } from "./exe-script-executor";
import { IActivityApi } from "./types";

export enum ActivityStateEnum {
New = "New",
Expand Down Expand Up @@ -47,13 +44,6 @@ export class Activity {
return this.agreement.getProviderInfo();
}

/**
* Temporary helper method that will build a script executor bound to this activity
*/
public createExeScriptExecutor(activityApi: IActivityApi, logger: Logger, options?: ExecutionOptions) {
return new ExeScriptExecutor(this, activityApi, logger, options);
}

public getState() {
return this.currentState;
}
Expand Down
47 changes: 47 additions & 0 deletions src/activity/api.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import { Activity, ActivityStateEnum } from "./activity";
import { Agreement } from "../market/agreement";
import { ExeScriptRequest } from "./exe-script-executor";
import { Result, StreamingBatchEvent } from "./results";
import { Observable } from "rxjs";

export type ActivityEvents = {
activityCreated: (activity: Activity) => void;
errorCreatingActivity: (error: Error) => void;

activityDestroyed: (activity: Activity) => void;
errorDestroyingActivity: (activity: Activity, error: Error) => void;

activityInitialized: (activity: Activity) => void;
errorInitializingActivity: (activity: Activity, error: Error) => void;

activityStateChanged: (activity: Activity, previousState: ActivityStateEnum) => void;
errorRefreshingActivity: (activity: Activity, error: Error) => void;

scriptExecuted: (activity: Activity, script: ExeScriptRequest, result: string) => void;
errorExecutingScript: (activity: Activity, script: ExeScriptRequest, error: Error) => void;

batchResultsReceived: (activity: Activity, batchId: string, results: Result[]) => void;
errorGettingBatchResults: (activity: Activity, batchId: string, error: Error) => void;

batchEventsReceived: (activity: Activity, batchId: string, event: StreamingBatchEvent) => void;
errorGettingBatchEvents: (activity: Activity, batchId: string, error: Error) => void;
};

/**
* Represents a set of use cases related to managing the lifetime of an activity
*/
export interface IActivityApi {
getActivity(id: string): Promise<Activity>;

createActivity(agreement: Agreement): Promise<Activity>;

destroyActivity(activity: Activity): Promise<Activity>;

getActivityState(id: string): Promise<ActivityStateEnum>;

executeScript(activity: Activity, script: ExeScriptRequest): Promise<string>;

getExecBatchResults(activity: Activity, batchId: string, commandIndex?: number, timeout?: number): Promise<Result[]>;

getExecBatchEvents(activity: Activity, batchId: string, commandIndex?: number): Observable<StreamingBatchEvent>;
}
3 changes: 0 additions & 3 deletions src/activity/config.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { Logger, defaultLogger } from "../shared/utils";
import { ExecutionOptions } from "./exe-script-executor";

const DEFAULTS = {
Expand All @@ -16,7 +15,6 @@ export class ExecutionConfig {
public readonly activityExecuteTimeout: number;
public readonly activityExeBatchResultPollIntervalSeconds: number;
public readonly activityExeBatchResultMaxRetries: number;
public readonly logger: Logger;

constructor(options?: ExecutionOptions) {
this.activityRequestTimeout = options?.activityRequestTimeout || DEFAULTS.activityRequestTimeout;
Expand All @@ -25,6 +23,5 @@ export class ExecutionConfig {
options?.activityExeBatchResultMaxRetries || DEFAULTS.activityExeBatchResultMaxRetries;
this.activityExeBatchResultPollIntervalSeconds =
options?.activityExeBatchResultPollIntervalSeconds || DEFAULTS.activityExeBatchResultPollIntervalSeconds;
this.logger = options?.logger || defaultLogger("work");
}
}
Loading

0 comments on commit cf7f3dd

Please sign in to comment.