Skip to content

Commit

Permalink
Merge pull request #906 from golemfactory/grisha87/stabilize-beta-exa…
Browse files Browse the repository at this point in the history
…mples

fix(beta): stabilized examples so that they work, fixed pool implementation
  • Loading branch information
mgordel authored Apr 22, 2024
2 parents a30471a + 4f35fad commit 7307f97
Show file tree
Hide file tree
Showing 16 changed files with 283 additions and 132 deletions.
1 change: 0 additions & 1 deletion examples/basic/hello-world.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ import {
.startCollectingProposals({
demandOffer,
paymentPlatform: "erc20-holesky-tglm",
bufferSize: 15,
})
.subscribe((proposalsBatch) => proposalsBatch.forEach((proposal) => proposalPool.add(proposal)));
const draftProposal = await proposalPool.acquire();
Expand Down
25 changes: 16 additions & 9 deletions examples/deployment/new-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,28 @@ async function main() {

const deployment = builder.getDeployment();

// Start your deployment
await deployment.start();

const activityPoolApp = deployment.getActivityPool("app");
const activity1 = await activityPoolApp.acquire();
// Get your pool of activities for specified need
const appPool = deployment.getActivityPool("app");
const dbPool = deployment.getActivityPool("db");

const activity2 = await deployment.getActivityPool("db").acquire();
// Get an instance out of the pool for use
const app = await appPool.acquire();
const db = await dbPool.acquire();

const result = await activity1.run("node -v");
console.log(result.stdout);
await activityPoolApp.release(activity1);
await activityPoolApp.drain();
// Run a command on the app VM instance
const appResult = await app.run("node -v");
console.log(appResult.stdout);
await appPool.release(app);

const result2 = await activity2.run("ls /");
console.log(result2.stdout);
// Run a command on the db VM instance
const dbResult = await db.run("ls /");
console.log(dbResult.stdout);
await dbPool.release(db);

// Stop the deployment cleanly
await deployment.stop();
} catch (err) {
console.error("Failed to run the example", err);
Expand Down
40 changes: 19 additions & 21 deletions examples/pool/hello-world.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
import {
YagnaApi,
MarketModuleImpl,
ActivityModuleImpl,
PaymentModuleImpl,
DraftOfferProposalPool,
AgreementPool,
ActivityPool,
Package,
AgreementPool,
Allocation,
DraftOfferProposalPool,
MarketModuleImpl,
Package,
PaymentModuleImpl,
YagnaApi,
} from "@golem-sdk/golem-js";

(async function main() {
const yagnaApi = new YagnaApi();

try {
await yagnaApi.connect();

const modules = {
market: new MarketModuleImpl(yagnaApi),
activity: new ActivityModuleImpl(yagnaApi),
payment: new PaymentModuleImpl(yagnaApi),
};

const demandOptions = {
demand: {
image: "golem/alpine:latest",
Expand Down Expand Up @@ -48,35 +50,32 @@ import {
const workload = Package.create({
imageTag: demandOptions.demand.image,
});

const allocation = await Allocation.create(yagnaApi, {
account: {
address: (await yagnaApi.identity.getIdentity()).identity,
platform: "erc20-holesky-tglm",
},
budget: 1,
});

const demandOffer = await modules.market.buildDemand(workload, allocation, {});

const proposalSubscription = modules.market
.startCollectingProposals({
demandOffer,
paymentPlatform: "erc20-holesky-tglm",
bufferSize: 15,
})
.subscribe((proposalsBatch) => proposalsBatch.forEach((proposal) => proposalPool.add(proposal)));
const agreementPool = new AgreementPool(modules, proposalPool, { replicas: { max: 2 } });

/** How many providers you plan to engage simultaneously */
const CONCURRENCY = 2;

const agreementPool = new AgreementPool(modules, proposalPool, { replicas: { max: CONCURRENCY } });
const activityPool = new ActivityPool(modules, agreementPool, {
replicas: 2,
replicas: CONCURRENCY,
});

const int = setInterval(() => {
console.log(
"Pool sizes (total/available/leased)",
proposalPool.count(),
proposalPool.availableCount(),
proposalPool.leasedCount(),
);
}, 5000);

const ctx = await activityPool.acquire();
const result = await ctx.run("echo Hello World");
console.log(result.stdout);
Expand All @@ -89,9 +88,8 @@ import {
await activityPool.release(ctx2);

proposalSubscription.unsubscribe();
await activityPool.drain();
await agreementPool.drain();
clearInterval(int);
await activityPool.drainAndClear();
await agreementPool.drainAndClear();
} catch (err) {
console.error("Pool execution failed:", err);
} finally {
Expand Down
31 changes: 25 additions & 6 deletions src/activity/activity-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export interface ActivityPoolEvents {
acquired: (activity: ActivityDTO) => void;
released: (activity: ActivityDTO) => void;
destroyed: (activity: ActivityDTO) => void;
created: (activity: ActivityDTO) => void;
error: (error: GolemWorkError) => void;
}

Expand Down Expand Up @@ -94,11 +95,20 @@ export class ActivityPool {
async destroy(activity: WorkContext) {
await this.activityPool.destroy(activity);
await this.agreementPool.destroy(activity.activity.agreement);
this.events.emit("destroyed", activity.getDto());
}

async drain() {
return this.activityPool.drain();
/**
* Sets the pool into draining mode and then clears it
*
* When set to drain mode, no new acquires will be possible. At the same time, all activities in the pool will be destroyed on the Provider.
*
* @return Resolves when all activities in the pool are destroyed
*/
async drainAndClear() {
await this.activityPool.drain();
await this.activityPool.clear();
this.events.emit("end");
return;
}

async runOnce<OutputType>(worker: Worker<OutputType>): Promise<OutputType> {
Expand All @@ -124,6 +134,13 @@ export class ActivityPool {
return this.activityPool.available;
}

/**
* Wait till the pool is ready to use (min number of items in pool are usable)
*/
ready(): Promise<void> {
return this.activityPool.ready();
}

private createPoolFactory(): Factory<WorkContext> {
return {
create: async (): Promise<WorkContext> => {
Expand All @@ -132,12 +149,14 @@ export class ActivityPool {
const activity = await this.modules.activity.createActivity(agreement);
const ctx = new WorkContext(activity, {});
await ctx.before();
this.events.emit("created", ctx.getDto());
return ctx;
},
destroy: async (activity: WorkContext) => {
destroy: async (ctx: WorkContext) => {
this.logger.debug("Destroying activity from the pool");
await this.modules.activity.destroyActivity(activity.activity);
await this.agreementPool.release(activity.activity.agreement);
await this.modules.activity.destroyActivity(ctx.activity);
await this.agreementPool.release(ctx.activity.agreement);
this.events.emit("destroyed", ctx.getDto());
},
validate: async (activity: WorkContext) => {
try {
Expand Down
41 changes: 32 additions & 9 deletions src/agreement/agreement-pool.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { Agreement, AgreementOptions } from "./agreement";
import { createPool, Factory, Pool } from "generic-pool";
import { defaultLogger, Logger } from "../shared/utils";
import { DraftOfferProposalPool } from "../market/draft-offer-proposal-pool";
import { MarketModule, GolemMarketError, MarketErrorCode } from "../market";
import { DraftOfferProposalPool, GolemMarketError, MarketErrorCode, MarketModule } from "../market";
import { AgreementDTO } from "./service";
import { EventEmitter } from "eventemitter3";
import { PaymentModule } from "../payment";
Expand All @@ -19,9 +18,11 @@ export interface AgreementPoolEvents {
end: () => void;
acquired: (agreement: AgreementDTO) => void;
released: (agreement: AgreementDTO) => void;
created: (agreement: AgreementDTO) => void;
destroyed: (agreement: AgreementDTO) => void;
error: (error: GolemMarketError) => void;
}

const MAX_REPLICAS = 100;

export class AgreementPool {
Expand Down Expand Up @@ -82,11 +83,20 @@ export class AgreementPool {

async destroy(agreement: Agreement): Promise<void> {
await this.agreementPool.destroy(agreement);
this.events.emit("destroyed", agreement.getDto());
}

async drain() {
return this.agreementPool.drain();
/**
* Sets the pool into draining mode and then clears it
*
* When set to drain mode, no new acquires will be possible. At the same time, all agreements in the pool will be terminated with the Providers.
*
* @return Resolves when all agreements are terminated
*/
async drainAndClear() {
await this.agreementPool.drain();
await this.agreementPool.clear();
this.events.emit("end");
return;
}

getSize() {
Expand All @@ -105,18 +115,31 @@ export class AgreementPool {
return this.agreementPool.available;
}

/**
* Wait till the pool is ready to use (min number of items in pool are usable)
*/
ready(): Promise<void> {
return this.agreementPool.ready();
}

private createPoolFactory(): Factory<Agreement> {
return {
create: async (): Promise<Agreement> => {
this.logger.debug("Creating new agreement to add to pool");
const proposal = await this.proposalPool.acquire();
return this.modules.market.proposeAgreement(this.modules.payment, proposal, this.options?.agreementOptions);
const agreement = await this.modules.market.proposeAgreement(
this.modules.payment,
proposal,
this.options?.agreementOptions,
);
this.events.emit("created", agreement.getDto());
return agreement;
},
destroy: async (agreement: Agreement) => {
this.logger.debug("Destroying agreement from the pool");
await this.modules.market.terminateAgreement(agreement);
//TODO: make Agreement compatible with ProposalNew instead of Proposal
this.logger.debug("Destroying agreement from the pool", { agreementId: agreement.id });
await this.modules.market.terminateAgreement(agreement, "Finished");
await this.proposalPool.remove(agreement.proposal);
this.events.emit("destroyed", agreement.getDto());
},
validate: async (agreement: Agreement) => {
try {
Expand Down
31 changes: 20 additions & 11 deletions src/agreement/agreement.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export interface AgreementEvents {
rejected: (details: { id: string; provider: ProviderInfo; reason: string }) => void;
terminated: (details: { id: string; provider: ProviderInfo; reason: string }) => void;
}

export interface ProviderInfo {
name: string;
id: string;
Expand All @@ -34,6 +35,7 @@ export interface AgreementOptions {

invoiceFilter?: InvoiceFilter;
}

/**
* Agreement module - an object representing the contract between the requestor and the provider.
* @hidden
Expand Down Expand Up @@ -97,7 +99,7 @@ export class Agreement {
getProviderInfo(): ProviderInfo {
return {
name: "todo",
id: "todo",
id: "todo" + this.id,
walletAddress: "todo",
};
}
Expand Down Expand Up @@ -151,24 +153,31 @@ export class Agreement {
* @description Blocking function waits till agreement will be terminated
* @throws Error if the agreement will be unable to terminate
*/
async terminate(reason: { [key: string]: string } = { message: "Finished" }) {
async terminate(reason = "Finished") {
try {
if ((await this.getState()) !== "Terminated")
if ((await this.getState()) !== "Terminated") {
await withTimeout(
this.yagnaApi.market.terminateAgreement(this.id, reason),
// TODO: Make a fix in ya-ts-client typings so that's going to be specifically {message:string}
this.yagnaApi.market.terminateAgreement(this.id, {
message: reason,
}),
this.options.agreementRequestTimeout,
);
this.events.emit("terminated", {
id: this.id,
provider: this.getProviderInfo(),
reason: reason.message,
});
this.logger.debug(`Agreement terminated`, { id: this.id });

this.events.emit("terminated", {
id: this.id,
provider: this.getProviderInfo(),
reason: reason,
});

this.logger.debug(`Agreement terminated`, { id: this.id, reason });
}
} catch (error) {
throw new GolemMarketError(
`Unable to terminate agreement ${this.id}. ${error.response?.data?.message || error.response?.data || error}`,
MarketErrorCode.AgreementTerminationFailed,
// this.proposal.demand, TODO
this.proposal.demand,
error,
);
}
}
Expand Down
5 changes: 2 additions & 3 deletions src/agreement/factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export class AgreementFactory {
throw new GolemMarketError(
`Unable to create agreement. Invalid response from the server`,
MarketErrorCode.AgreementCreationFailed,
// proposal.demand, TODO
proposal.demand,
);
}
// const data = await this.yagnaApi.market.getAgreement(agreementId);
Expand All @@ -69,8 +69,7 @@ export class AgreementFactory {
throw new GolemMarketError(
`Unable to create agreement ${error?.response?.data?.message || error?.response?.data || error}`,
MarketErrorCode.AgreementCreationFailed,
undefined,
// proposal.demand,
proposal.demand,
error,
);
}
Expand Down
Loading

0 comments on commit 7307f97

Please sign in to comment.