Skip to content

Commit

Permalink
chore: sync beta
Browse files Browse the repository at this point in the history
  • Loading branch information
mgordel committed Jun 19, 2024
2 parents d85ecdc + ecab235 commit 9c60aae
Show file tree
Hide file tree
Showing 38 changed files with 416 additions and 164 deletions.
75 changes: 75 additions & 0 deletions examples/advanced/reuse-allocation.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/**
* This advanced example demonstrates create an allocation manually and then reuse
* it across multiple market orders.
*/
import { MarketOrderSpec, GolemNetwork } from "@golem-sdk/golem-js";
import { pinoPrettyLogger } from "@golem-sdk/pino-logger";
(async () => {
const glm = new GolemNetwork({
logger: pinoPrettyLogger({
level: "info",
}),
});

try {
await glm.connect();

const allocation = await glm.payment.createAllocation({
budget: 1,
expirationSec: 3600,
});

const firstOrder: MarketOrderSpec = {
demand: {
workload: { imageTag: "golem/alpine:latest" },
},
market: {
rentHours: 0.5,
pricing: {
model: "burn-rate",
avgGlmPerHour: 0.5,
},
},
payment: {
// You can either pass the allocation object ...
allocation,
},
};
const secondOrder: MarketOrderSpec = {
demand: {
workload: { imageTag: "golem/alpine:latest" },
},
market: {
rentHours: 0.5,
pricing: {
model: "burn-rate",
avgGlmPerHour: 0.5,
},
},
payment: {
// ... or just the allocation ID
allocation: allocation.id,
},
};

const lease1 = await glm.oneOf(firstOrder);
const lease2 = await glm.oneOf(secondOrder);

await lease1
.getExeUnit()
.then((exe) => exe.run("echo Running on first lease"))
.then((res) => console.log(res.stdout));
await lease2
.getExeUnit()
.then((exe) => exe.run("echo Running on second lease"))
.then((res) => console.log(res.stdout));

await lease1.finalize();
await lease2.finalize();
await glm.payment.releaseAllocation(allocation);
} catch (err) {
console.error("Failed to run the example", err);
} finally {
await glm.disconnect();
}
})().catch(console.error);
2 changes: 1 addition & 1 deletion examples/advanced/step-by-step.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ import { filter, map, switchMap, take } from "rxjs";
// To keep this example simple, we will not retry and just crash if the signing fails
const draftProposal = draftProposals[0]!;
const agreement = await glm.market.proposeAgreement(draftProposal);
console.log("Agreement signed with provider", agreement.getProviderInfo().name);
console.log("Agreement signed with provider", agreement.provider.name);

// Provider is ready to start the computation
// Let's setup payment first
Expand Down
8 changes: 4 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
"tmp": "^0.2.2",
"uuid": "^9.0.1",
"ws": "^8.16.0",
"ya-ts-client": "^1.1.1-beta.1"
"ya-ts-client": "^1.1.2"
},
"devDependencies": {
"@commitlint/cli": "^19.0.3",
Expand Down
94 changes: 79 additions & 15 deletions src/activity/activity.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,28 @@ export class ActivityModuleImpl implements ActivityModule {
async executeScript(activity: Activity, script: ExeScriptRequest): Promise<string> {
this.logger.debug("Executing script on activity", { activityId: activity.id });
try {
this.events.emit("scriptSent", activity, script);
const result = await this.activityApi.executeScript(activity, script);
this.events.emit("scriptExecuted", activity, script, result);
this.events.emit(
"scriptExecuted",
await this.refreshActivity(activity).catch(() => {
this.logger.warn("Failed to refresh activity after script execution", { activityId: activity.id });
return activity;
}),
script,
result,
);
return result;
} catch (error) {
this.events.emit("errorExecutingScript", activity, script, error);
this.events.emit(
"errorExecutingScript",
await this.refreshActivity(activity).catch(() => {
this.logger.warn("Failed to refresh activity after script execution error", { activityId: activity.id });
return activity;
}),
script,
error,
);
throw error;
}
}
Expand All @@ -142,10 +159,26 @@ export class ActivityModuleImpl implements ActivityModule {
this.logger.debug("Fetching batch results", { activityId: activity.id, batchId });
try {
const results = await this.activityApi.getExecBatchResults(activity, batchId, commandIndex, timeout);
this.events.emit("batchResultsReceived", activity, batchId, results);
this.events.emit(
"batchResultsReceived",
await this.refreshActivity(activity).catch(() => {
this.logger.warn("Failed to refresh activity after batch results received", { activityId: activity.id });
return activity;
}),
batchId,
results,
);
return results;
} catch (error) {
this.events.emit("errorGettingBatchResults", activity, batchId, error);
this.events.emit(
"errorGettingBatchResults",
await this.refreshActivity(activity).catch(() => {
this.logger.warn("Failed to refresh activity after batch results error", { activityId: activity.id });
return activity;
}),
batchId,
error,
);
throw error;
}
}
Expand All @@ -156,11 +189,27 @@ export class ActivityModuleImpl implements ActivityModule {
): Observable<StreamingBatchEvent> {
this.logger.debug("Observing streaming batch events", { activityId: activity.id, batchId });
return this.activityApi.getExecBatchEvents(activity, batchId, commandIndex).pipe(
tap((event) => {
this.events.emit("batchEventsReceived", activity, batchId, event);
tap(async (event) => {
this.events.emit(
"batchEventsReceived",
await this.refreshActivity(activity).catch(() => {
this.logger.warn("Failed to refresh activity after batch events received", { activityId: activity.id });
return activity;
}),
batchId,
event,
);
}),
catchError((error) => {
this.events.emit("errorGettingBatchEvents", activity, batchId, error);
catchError(async (error) => {
this.events.emit(
"errorGettingBatchEvents",
await this.refreshActivity(activity).catch(() => {
this.logger.warn("Failed to refresh activity after batch events error", { activityId: activity.id });
return activity;
}),
batchId,
error,
);
throw error;
}),
);
Expand All @@ -169,7 +218,7 @@ export class ActivityModuleImpl implements ActivityModule {
async createActivity(agreement: Agreement): Promise<Activity> {
this.logger.info("Creating activity", {
agreementId: agreement.id,
provider: agreement.getProviderInfo(),
provider: agreement.provider,
});
try {
const activity = await this.activityApi.createActivity(agreement);
Expand All @@ -185,7 +234,7 @@ export class ActivityModuleImpl implements ActivityModule {
this.logger.info("Destroying activity", {
activityId: activity.id,
agreementId: activity.agreement.id,
provider: activity.agreement.getProviderInfo(),
provider: activity.agreement.provider,
});
try {
const updated = await this.activityApi.destroyActivity(activity);
Expand All @@ -205,13 +254,13 @@ export class ActivityModuleImpl implements ActivityModule {
});
try {
const freshActivity = await this.activityApi.getActivity(staleActivity.id);
if (freshActivity.getState() !== staleActivity.getState()) {
if (freshActivity.getState() !== freshActivity.getPreviousState()) {
this.logger.debug("Activity state changed", {
activityId: staleActivity.id,
previousState: staleActivity.getState(),
previousState: freshActivity.getPreviousState(),
newState: freshActivity.getState(),
});
this.events.emit("activityStateChanged", freshActivity, staleActivity.getState());
this.events.emit("activityStateChanged", freshActivity, freshActivity.getPreviousState());
}
return freshActivity;
} catch (error) {
Expand All @@ -236,10 +285,25 @@ export class ActivityModuleImpl implements ActivityModule {
this.logger.debug("Initializing the exe-unit for activity", { activityId: activity.id });
try {
await ctx.before();
this.events.emit("activityInitialized", activity);
this.events.emit(
"workContextInitialized",
await this.refreshActivity(activity).catch(() => {
this.logger.warn("Failed to refresh activity after work context initialization", { activityId: activity.id });
return activity;
}),
);
return ctx;
} catch (error) {
this.events.emit("errorInitializingActivity", activity, error);
this.events.emit(
"errorInitializingWorkContext",
await this.refreshActivity(activity).catch(() => {
this.logger.warn("Failed to refresh activity after work context initialization error", {
activityId: activity.id,
});
return activity;
}),
error,
);
throw error;
}
}
Expand Down
10 changes: 8 additions & 2 deletions src/activity/activity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,26 @@ export class Activity {
* @param id The ID of the activity in Yagna
* @param agreement The agreement that's related to this activity
* @param currentState The current state as it was obtained from yagna
* @param previousState The previous state (or New if this is the first time we're creating the activity)
* @param usage Current resource usage vector information
*/
constructor(
public readonly id: string,
public readonly agreement: Agreement,
protected readonly currentState: ActivityStateEnum = ActivityStateEnum.New,
protected readonly previousState: ActivityStateEnum = ActivityStateEnum.Unknown,
protected readonly usage: ActivityUsageInfo,
) {}

public getProviderInfo(): ProviderInfo {
return this.agreement.getProviderInfo();
public get provider(): ProviderInfo {
return this.agreement.provider;
}

public getState() {
return this.currentState;
}

public getPreviousState() {
return this.previousState;
}
}
5 changes: 3 additions & 2 deletions src/activity/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ export type ActivityEvents = {
activityDestroyed: (activity: Activity) => void;
errorDestroyingActivity: (activity: Activity, error: Error) => void;

activityInitialized: (activity: Activity) => void;
errorInitializingActivity: (activity: Activity, error: Error) => void;
workContextInitialized: (activity: Activity) => void;
errorInitializingWorkContext: (activity: Activity, error: Error) => void;

activityStateChanged: (activity: Activity, previousState: ActivityStateEnum) => void;
errorRefreshingActivity: (activity: Activity, error: Error) => void;

scriptSent: (activity: Activity, script: ExeScriptRequest) => void;
scriptExecuted: (activity: Activity, script: ExeScriptRequest, result: string) => void;
errorExecutingScript: (activity: Activity, script: ExeScriptRequest, error: Error) => void;

Expand Down
2 changes: 1 addition & 1 deletion src/activity/exe-script-executor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ describe("ExeScriptExecutor", () => {
reset(mockStorageProvider);
reset(mockActivityModule);
resetAllMocks();
when(mockActivity.getProviderInfo()).thenReturn({
when(mockActivity.provider).thenReturn({
id: "test-provider-id",
name: "test-provider-name",
walletAddress: "0xProviderWallet",
Expand Down
6 changes: 3 additions & 3 deletions src/activity/exe-script-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ export class ExeScriptExecutor {
WorkErrorCode.ScriptExecutionFailed,
this.activity.agreement,
this.activity,
this.activity.getProviderInfo(),
this.activity.provider,
error,
);
}
Expand Down Expand Up @@ -175,7 +175,7 @@ export class ExeScriptExecutor {
WorkErrorCode.ActivityResultsFetchingFailed,
agreement,
activity,
activity.getProviderInfo(),
activity.provider,
error,
),
);
Expand Down Expand Up @@ -224,7 +224,7 @@ export class ExeScriptExecutor {
WorkErrorCode.ActivityResultsFetchingFailed,
activity.agreement,
activity,
activity.getProviderInfo(),
activity.provider,
);
}
if (error) {
Expand Down
12 changes: 6 additions & 6 deletions src/activity/work/batch.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ describe("Batch", () => {
walletAddress: "0xTestProvider",
};

when(mockAgreement.getProviderInfo()).thenReturn(providerInfo);
when(mockActivity.getProviderInfo()).thenReturn(providerInfo);
when(mockAgreement.provider).thenReturn(providerInfo);
when(mockActivity.provider).thenReturn(providerInfo);
when(mockActivity.agreement).thenReturn(instance(mockAgreement));

activity = instance(mockActivity);
Expand Down Expand Up @@ -161,7 +161,7 @@ describe("Batch", () => {
WorkErrorCode.ScriptExecutionFailed,
activity.agreement,
activity,
activity.getProviderInfo(),
activity.provider,
new Error("FAILURE"),
),
);
Expand All @@ -179,7 +179,7 @@ describe("Batch", () => {
WorkErrorCode.ScriptExecutionFailed,
activity.agreement,
activity,
activity.getProviderInfo(),
activity.provider,
new Error("ERROR"),
),
);
Expand All @@ -197,7 +197,7 @@ describe("Batch", () => {
WorkErrorCode.ScriptExecutionFailed,
activity.agreement,
activity,
activity.getProviderInfo(),
activity.provider,
new Error("FAILURE"),
),
);
Expand Down Expand Up @@ -284,7 +284,7 @@ describe("Batch", () => {
WorkErrorCode.ScriptExecutionFailed,
activity.agreement,
activity,
activity.getProviderInfo(),
activity.provider,
new Error("ERROR"),
),
);
Expand Down
Loading

0 comments on commit 9c60aae

Please sign in to comment.