Skip to content

Commit

Permalink
Merge pull request #943 from golemfactory/feature/JST-949/lease-module
Browse files Browse the repository at this point in the history
Lease Module
  • Loading branch information
SewerynKras authored May 29, 2024
2 parents 4b6910c + 4eb8a12 commit 0987362
Show file tree
Hide file tree
Showing 18 changed files with 234 additions and 175 deletions.
2 changes: 1 addition & 1 deletion examples/advanced/hello-world.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import { pinoPrettyLogger } from "@golem-sdk/pino-logger";

const agreement = await glm.market.proposeAgreement(draftProposal);

const lease = await glm.market.createLease(agreement, allocation);
const lease = glm.lease.createLease(agreement, allocation);
const activity = await glm.activity.createActivity(agreement);

// We managed to create the activity, no need to look for more agreement candidates
Expand Down
2 changes: 1 addition & 1 deletion examples/advanced/local-image/serveLocalGvmi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ const getImagePath = (path: string) => fileURLToPath(new URL(path, import.meta.u

const agreement = await glm.market.proposeAgreement(draftProposal);

const lease = await glm.market.createLease(agreement, allocation);
const lease = glm.lease.createLease(agreement, allocation);
const activity = await glm.activity.createActivity(agreement);

// We managed to create the activity, no need to look for more agreement candidates
Expand Down
3 changes: 2 additions & 1 deletion examples/advanced/manual-pools.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,10 @@ const demandOptions = {
market: glm.market,
activity: glm.activity,
payment: glm.payment,
lease: glm.lease,
};

const pool = depModules.market.createLeaseProcessPool(proposalPool, allocation, {
const pool = depModules.lease.createLeaseProcessPool(proposalPool, allocation, {
replicas: { max: CONCURRENCY },
});

Expand Down
2 changes: 1 addition & 1 deletion src/activity/activity.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export interface ActivityModule {
*
* @return An WorkContext that's fully commissioned and the user can execute their commands
*/
createWorkContext(activity: Activity): Promise<WorkContext>;
createWorkContext(activity: Activity, options?: WorkOptions): Promise<WorkContext>;
}

/**
Expand Down
1 change: 1 addition & 0 deletions src/experimental/deployment/builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ export class GolemDeploymentBuilder {
market: this.glm.market,
activity: this.glm.activity,
network: this.glm.network,
lease: this.glm.lease,
},
{
dataTransferProtocol: this.glm.options.dataTransferProtocol ?? "gftp",
Expand Down
5 changes: 4 additions & 1 deletion src/experimental/deployment/deployment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { CreateLeaseProcessPoolOptions } from "./builder";
import { Subscription } from "rxjs";
import { LeaseProcessPool } from "../../lease-process";
import { DataTransferProtocol } from "../../shared/types";
import { LeaseModule } from "../../lease-process/lease.module";

export enum DeploymentState {
INITIAL = "INITIAL",
Expand Down Expand Up @@ -83,6 +84,7 @@ export class Deployment {
activity: ActivityModule;
payment: PaymentModule;
network: NetworkModule;
lease: LeaseModule;
};

constructor(
Expand All @@ -94,6 +96,7 @@ export class Deployment {
activity: ActivityModule;
payment: PaymentModule;
network: NetworkModule;
lease: LeaseModule;
},
options: DeploymentOptions,
) {
Expand Down Expand Up @@ -174,7 +177,7 @@ export class Deployment {
error: (e) => this.logger.error("Error while collecting proposals", e),
});

const leaseProcessPool = this.modules.market.createLeaseProcessPool(proposalPool, allocation, {
const leaseProcessPool = this.modules.lease.createLeaseProcessPool(proposalPool, allocation, {
replicas: pool.options.deployment?.replicas,
network,
leaseProcessOptions: {
Expand Down
14 changes: 9 additions & 5 deletions src/golem-network/golem-network.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { AgreementApiAdapter } from "../shared/yagna/adapters/agreement-api-adap
import { GolemNetwork, MarketOrderSpec } from "./golem-network";
import { _, instance, mock, reset, when, verify } from "@johanblumenberg/ts-mockito";
import { GftpStorageProvider } from "../shared/storage";
import { LeaseModuleImpl } from "../lease-process/lease.module";

const order: MarketOrderSpec = Object.freeze({
demand: {
Expand All @@ -31,6 +32,7 @@ const mockMarket = mock(MarketModuleImpl);
const mockPayment = mock(PaymentModuleImpl);
const mockActivity = mock(ActivityModuleImpl);
const mockNetwork = mock(NetworkModuleImpl);
const mockLease = mock(LeaseModuleImpl);
const mockYagna = mock(YagnaApi);
const mockPaymentApi = mock(PaymentApiAdapter);
const mockActivityApi = mock(ActivityApiAdapter);
Expand All @@ -44,6 +46,7 @@ afterEach(() => {
reset(mockMarket);
reset(mockPayment);
reset(mockNetwork);
reset(mockLease);
reset(mockPaymentApi);
reset(mockActivityApi);
reset(mockAgreementApi);
Expand All @@ -59,6 +62,7 @@ function getGolemNetwork() {
market: instance(mockMarket),
payment: instance(mockPayment),
network: instance(mockNetwork),
lease: instance(mockLease),
paymentApi: instance(mockPaymentApi),
activityApi: instance(mockActivityApi),
agreementApi: instance(mockAgreementApi),
Expand All @@ -85,20 +89,20 @@ function mockPaymentCreateAllocation() {
describe("Golem Network", () => {
describe("oneOf()", () => {
it("should create a lease and clean it up when disconnected", async () => {
const mockLease = {
const mockLeaseProcess = {
finalize: jest.fn().mockImplementation(() => Promise.resolve()) as LeaseProcess["finalize"],
} as LeaseProcess;
when(mockMarket.createLease(_, _, _)).thenReturn(mockLease);
when(mockLease.createLease(_, _, _)).thenReturn(mockLeaseProcess);
const mockSubscription = mockStartCollectingProposals();
const mockAllocation = mockPaymentCreateAllocation();
jest.spyOn(DraftOfferProposalPool.prototype, "acquire").mockResolvedValue({} as OfferProposal);

const glm = getGolemNetwork();
await glm.connect();
const lease = await glm.oneOf(order);
expect(lease === mockLease).toBe(true);
expect(lease === mockLeaseProcess).toBe(true);
await glm.disconnect();
expect(mockLease.finalize).toHaveBeenCalled();
expect(mockLeaseProcess.finalize).toHaveBeenCalled();
expect(mockSubscription.unsubscribe).toHaveBeenCalled();
verify(mockPayment.releaseAllocation(mockAllocation)).once();
});
Expand All @@ -109,7 +113,7 @@ describe("Golem Network", () => {
const mockAllocation = mockPaymentCreateAllocation();
const mockLeasePool = mock(LeaseProcessPool);
when(mockLeasePool.drainAndClear()).thenResolve();
when(mockMarket.createLeaseProcessPool(_, _, _)).thenReturn(instance(mockLeasePool));
when(mockLease.createLeaseProcessPool(_, _, _)).thenReturn(instance(mockLeasePool));

const glm = getGolemNetwork();
await glm.connect();
Expand Down
17 changes: 15 additions & 2 deletions src/golem-network/golem-network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import {
} from "../shared/storage";
import { DataTransferProtocol } from "../shared/types";
import { NetworkApiAdapter } from "../shared/yagna/adapters/network-api-adapter";
import { LeaseModule, LeaseModuleImpl } from "../lease-process/lease.module";

export interface GolemNetworkOptions {
/**
Expand Down Expand Up @@ -70,6 +71,7 @@ export interface GolemNetworkOptions {
payment: PaymentModule;
activity: ActivityModule;
network: NetworkModule;
lease: LeaseModule;
}
>;
}
Expand Down Expand Up @@ -138,6 +140,7 @@ export class GolemNetwork {
public readonly payment: PaymentModule;
public readonly activity: ActivityModule;
public readonly network: NetworkModule;
public readonly lease: LeaseModule;

/**
* Dependency Container
Expand Down Expand Up @@ -220,6 +223,16 @@ export class GolemNetwork {
this.options.override?.market || new MarketModuleImpl({ ...this.services, networkModule: this.network });
this.payment = this.options.override?.payment || new PaymentModuleImpl(this.services, this.options.payment);
this.activity = this.options.override?.activity || new ActivityModuleImpl(this.services);
this.lease =
this.options.override?.lease ||
new LeaseModuleImpl({
activityModule: this.activity,
paymentModule: this.payment,
marketModule: this.market,
networkModule: this.network,
logger: this.logger,
storageProvider: this.storageProvider,
});
} catch (err) {
this.events.emit("error", err);
throw err;
Expand Down Expand Up @@ -301,7 +314,7 @@ export class GolemNetwork {
? await this.network.createNetworkNode(order.network, agreement.getProviderInfo().id)
: undefined;

const lease = this.market.createLease(agreement, allocation, {
const lease = this.lease.createLease(agreement, allocation, {
payment: order.payment,
activity: order.activity,
networkNode,
Expand Down Expand Up @@ -379,7 +392,7 @@ export class GolemNetwork {
});
const subscription = proposalPool.readFrom(proposal$);

const leaseProcessPool = this.market.createLeaseProcessPool(proposalPool, allocation, {
const leaseProcessPool = this.lease.createLeaseProcessPool(proposalPool, allocation, {
replicas: concurrency,
network: order.network,
leaseProcessOptions: {
Expand Down
1 change: 1 addition & 0 deletions src/lease-process/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from "./lease-process";
export * from "./lease-process-pool";
export * from "./lease.module";
21 changes: 11 additions & 10 deletions src/lease-process/lease-process-pool.test.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
import type { Agreement } from "../market/agreement/agreement";
import { _, imock, instance, mock, reset, spy, verify, when } from "@johanblumenberg/ts-mockito";
import type { Agreement, IAgreementApi } from "../market/agreement/agreement";
import { LeaseProcess } from "./lease-process";
import { Allocation, IPaymentApi } from "../payment";
import { Allocation } from "../payment";
import type { MarketModule } from "../market";
import { DraftOfferProposalPool } from "../market";
import { LeaseProcessPool } from "./lease-process-pool";
import { type RequireAtLeastOne } from "../shared/utils/types";
import { NetworkModule } from "../network";
import { LeaseModule } from "./lease.module";
import { Logger } from "../shared/utils";

const agreementApi = imock<IAgreementApi>();
const paymentApi = imock<IPaymentApi>();
const allocation = mock(Allocation);
const proposalPool = mock(DraftOfferProposalPool);
const marketModule = imock<MarketModule>();
const networkModule = imock<NetworkModule>();
const leaseModule = imock<LeaseModule>();

function getMockLeaseProcess() {
return {
Expand All @@ -25,31 +26,31 @@ function getMockLeaseProcess() {

function getLeasePool(replicas: RequireAtLeastOne<{ min: number; max: number }>) {
return new LeaseProcessPool({
agreementApi: instance(agreementApi),
paymentApi: instance(paymentApi),
allocation: instance(allocation),
proposalPool: instance(proposalPool),
marketModule: instance(marketModule),
networkModule: instance(networkModule),
leaseModule: instance(leaseModule),
logger: instance(imock<Logger>()),
network: undefined,
replicas,
});
}

beforeEach(() => {
jest.clearAllMocks();
reset(agreementApi);
reset(paymentApi);
reset(allocation);
reset(proposalPool);
reset(marketModule);
reset(networkModule);
reset(leaseModule);
});

describe("LeaseProcessPool", () => {
describe("ready()", () => {
it("prepares MIN_POOL_SIZE lease processes", async () => {
when(marketModule.signAgreementFromPool(_)).thenResolve({} as Agreement);
when(marketModule.createLease(_, _, _)).thenCall(() => ({}) as LeaseProcess);
when(leaseModule.createLease(_, _, _)).thenCall(() => ({}) as LeaseProcess);

const pool = getLeasePool({ min: 5, max: 10 });

Expand All @@ -59,7 +60,7 @@ describe("LeaseProcessPool", () => {
verify(marketModule.signAgreementFromPool(_)).times(5);
});
it("retries on error", async () => {
when(marketModule.createLease(_, _, _)).thenCall(() => ({}) as LeaseProcess);
when(leaseModule.createLease(_, _, _)).thenCall(() => ({}) as LeaseProcess);

const fakeAgreement = {} as Agreement;
when(marketModule.signAgreementFromPool(_))
Expand Down
20 changes: 10 additions & 10 deletions src/lease-process/lease-process-pool.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
import type { Agreement, IAgreementApi } from "../market/agreement/agreement";
import type { Agreement } from "../market/agreement/agreement";
import type { Logger } from "../shared/utils";
import { defaultLogger, createAbortSignalFromTimeout, runOnNextEventLoopIteration } from "../shared/utils";
import { createAbortSignalFromTimeout, runOnNextEventLoopIteration } from "../shared/utils";
import type { DraftOfferProposalPool, MarketModule } from "../market";
import { GolemMarketError, MarketErrorCode } from "../market";
import { EventEmitter } from "eventemitter3";
import type { RequireAtLeastOne } from "../shared/utils/types";
import type { Allocation, IPaymentApi } from "../payment";
import type { Allocation } from "../payment";
import type { LeaseProcess, LeaseProcessOptions } from "./lease-process";
import { Network, NetworkModule } from "../network";
import { LeaseModule } from "./lease.module";

export interface LeaseProcessPoolDependencies {
agreementApi: IAgreementApi;
paymentApi: IPaymentApi;
allocation: Allocation;
proposalPool: DraftOfferProposalPool;
marketModule: MarketModule;
networkModule: NetworkModule;
logger?: Logger;
leaseModule: LeaseModule;
logger: Logger;
}

export interface LeaseProcessPoolOptions {
Expand Down Expand Up @@ -58,24 +58,24 @@ export class LeaseProcessPool {

private allocation: Allocation;
private network?: Network;
private agreementApi: IAgreementApi;
private proposalPool: DraftOfferProposalPool;
private marketModule: MarketModule;
private networkModule: NetworkModule;
private leaseModule: LeaseModule;
private readonly minPoolSize: number;
private readonly maxPoolSize: number;
private readonly leaseProcessOptions?: LeaseProcessOptions;

constructor(options: LeaseProcessPoolOptions & LeaseProcessPoolDependencies) {
this.agreementApi = options.agreementApi;
this.allocation = options.allocation;
this.proposalPool = options.proposalPool;
this.marketModule = options.marketModule;
this.leaseModule = options.leaseModule;
this.networkModule = options.networkModule;
this.network = options.network;
this.leaseProcessOptions = options.leaseProcessOptions;

this.logger = this.logger = options?.logger || defaultLogger("lease-process-pool");
this.logger = options.logger;

this.minPoolSize =
(() => {
Expand All @@ -102,7 +102,7 @@ export class LeaseProcessPool {
const networkNode = this.network
? await this.networkModule.createNetworkNode(this.network, agreement.getProviderInfo().id)
: undefined;
const leaseProcess = this.marketModule.createLease(agreement, this.allocation, {
const leaseProcess = this.leaseModule.createLease(agreement, this.allocation, {
networkNode,
...this.leaseProcessOptions,
});
Expand Down
Loading

0 comments on commit 0987362

Please sign in to comment.