diff --git a/examples/market/scan.ts b/examples/market/scan.ts index 8ae4e8ebc..d63276c8c 100644 --- a/examples/market/scan.ts +++ b/examples/market/scan.ts @@ -1,9 +1,9 @@ /** - * This example demonstrates how to scan the market for proposals + * This example demonstrates how to scan the market for OfferProposals * Lets learn what is the average start price * Notice that we don't need to even allocate any budget for this operation */ -import { GolemNetwork, ProposalNew } from "@golem-sdk/golem-js"; +import { GolemNetwork, OfferProposal } from "@golem-sdk/golem-js"; (async () => { const glm = new GolemNetwork({ @@ -26,7 +26,7 @@ import { GolemNetwork, ProposalNew } from "@golem-sdk/golem-js"; payerDetails, ); - const offers = new Set(); + const offers = new Set(); console.log("Scanning the market..."); const subscription = glm.market @@ -35,12 +35,12 @@ import { GolemNetwork, ProposalNew } from "@golem-sdk/golem-js"; bufferSize: 5, }) .subscribe({ - next: (proposals) => { - console.log("Received a batch of ", proposals.length, " offers..."); - proposals.forEach((proposal) => offers.add(proposal)); + next: (OfferProposals) => { + console.log("Received a batch of ", OfferProposals.length, " offers..."); + OfferProposals.forEach((OfferProposal) => offers.add(OfferProposal)); }, error: (e) => { - console.error("Error while collecting proposals", e); + console.error("Error while collecting OfferProposals", e); }, }); diff --git a/src/agreement/agreement.ts b/src/agreement/agreement.ts index 134b93165..3c8467a60 100644 --- a/src/agreement/agreement.ts +++ b/src/agreement/agreement.ts @@ -1,6 +1,6 @@ import { Logger, YagnaOptions } from "../shared/utils"; import { MarketApi } from "ya-ts-client"; -import { ProposalNew } from "../market"; +import { OfferProposal } from "../market"; import { AgreementDTO } from "./service"; import { InvoiceFilter } from "../payment/service"; @@ -41,7 +41,7 @@ export interface IAgreementApi { * * @return An agreement that's in a "Proposal" state (not yet usable for activity creation) */ - createAgreement(proposal: ProposalNew): Promise; + createAgreement(proposal: OfferProposal): Promise; /** * Request creating an agreement from the provided proposal, send it to the Provider and wait for approval @@ -50,7 +50,7 @@ export interface IAgreementApi { * * @return An agreement that's already in an "Approved" state and can be used to create activities on the Provider */ - proposeAgreement(proposal: ProposalNew): Promise; + proposeAgreement(proposal: OfferProposal): Promise; // TODO: Detach return type from ya-ts-client! getAgreementState(id: string): Promise; diff --git a/src/agreement/service.ts b/src/agreement/service.ts index 922a06775..688309d43 100644 --- a/src/agreement/service.ts +++ b/src/agreement/service.ts @@ -2,7 +2,7 @@ import Bottleneck from "bottleneck"; import { defaultLogger, Logger, sleep, YagnaApi } from "../shared/utils"; import { Agreement, IAgreementApi, LegacyAgreementServiceOptions } from "./agreement"; import { AgreementServiceConfig } from "./config"; -import { GolemMarketError, MarketErrorCode, ProposalNew } from "../market"; +import { GolemMarketError, MarketErrorCode, OfferProposal } from "../market"; export interface AgreementDTO { id: string; @@ -12,7 +12,7 @@ export interface AgreementDTO { export class AgreementCandidate { agreement?: AgreementDTO; - constructor(readonly proposal: ProposalNew) {} + constructor(readonly proposal: OfferProposal) {} } export type AgreementSelector = (candidates: AgreementCandidate[]) => Promise; @@ -69,7 +69,7 @@ export class AgreementPoolService { * Add proposal for create agreement purposes * @param proposal Proposal */ - async addProposal(proposal: ProposalNew) { + async addProposal(proposal: OfferProposal) { // TODO: this.logger.debug(`New proposal added to pool`, { providerName: proposal.provider.name }); this.pool.add(new AgreementCandidate(proposal)); } diff --git a/src/experimental/reputation/system.ts b/src/experimental/reputation/system.ts index 6f30f71d3..b6a4b34d9 100644 --- a/src/experimental/reputation/system.ts +++ b/src/experimental/reputation/system.ts @@ -1,4 +1,4 @@ -import { ProposalFilterNew, ProposalNew } from "../../market"; +import { ProposalFilterNew, OfferProposal } from "../../market"; import { AgreementCandidate, AgreementSelector } from "../../agreement"; import { GolemReputationError } from "./error"; import { @@ -333,7 +333,7 @@ export class ReputationSystem { * @param opts */ proposalFilter(opts?: ProposalFilterOptions): ProposalFilterNew { - return (proposal: ProposalNew) => { + return (proposal: OfferProposal) => { // Filter out rejected operators. const operatorEntry = this.rejectedOperatorsMap.get(proposal.provider.walletAddress); if (operatorEntry) { diff --git a/src/golem-network.ts b/src/golem-network.ts index 88b213b83..69f05d07f 100644 --- a/src/golem-network.ts +++ b/src/golem-network.ts @@ -7,7 +7,7 @@ import { MarketApi, MarketModule, MarketModuleImpl, - ProposalNew, + OfferProposal, } from "./market"; import { PaymentModule, PaymentModuleImpl, PaymentModuleOptions } from "./payment"; import { ActivityModule, ActivityModuleImpl, IFileServer } from "./activity"; @@ -22,7 +22,7 @@ import { IAgreementApi } from "./agreement/agreement"; import { AgreementApiAdapter } from "./shared/yagna/adapters/agreement-api-adapter"; import { ProposalRepository } from "./shared/yagna/repository/proposal-repository"; import { CacheService } from "./shared/cache/CacheService"; -import { IProposalRepository } from "./market/proposal"; +import { IProposalRepository } from "./market/offer-proposal"; import { DemandRepository } from "./shared/yagna/repository/demand-repository"; import { IDemandRepository } from "./market/demand"; import { GftpServerAdapter } from "./shared/storage/GftpServerAdapter"; @@ -66,7 +66,7 @@ export type GolemServices = { activityApi: IActivityApi; agreementApi: IAgreementApi; marketApi: MarketApi; - proposalCache: CacheService; + proposalCache: CacheService; proposalRepository: IProposalRepository; demandRepository: IDemandRepository; fileServer: IFileServer; @@ -126,7 +126,7 @@ export class GolemNetwork { this.storageProvider = this.createStorageProvider(); const demandCache = new CacheService(); - const proposalCache = new CacheService(); + const proposalCache = new CacheService(); const demandRepository = new DemandRepository(this.yagna.market, demandCache); const proposalRepository = new ProposalRepository(this.yagna.market, proposalCache); diff --git a/src/market/api.ts b/src/market/api.ts index 1db4d10dd..eadb17c67 100644 --- a/src/market/api.ts +++ b/src/market/api.ts @@ -1,7 +1,7 @@ import { Observable } from "rxjs"; import { Demand, DemandSpecification } from "./demand"; import YaTsClient from "ya-ts-client"; -import { ProposalNew } from "./proposal"; +import { OfferProposal } from "./offer-proposal"; export type NewProposalEvent = YaTsClient.MarketApi.ProposalEventDTO; export type ProposalRejectedEvent = YaTsClient.MarketApi.ProposalRejectedEventDTO; @@ -30,5 +30,15 @@ export interface MarketApi { /** * Sends a counter-proposal to the given proposal. Returns the newly created counter-proposal. */ - counterProposal(receivedProposal: ProposalNew, specification: DemandSpecification): Promise; + counterProposal(receivedProposal: OfferProposal, specification: DemandSpecification): Promise; + + /** + * Sends a "reject" response for the proposal that was received from the Provider as part of the negotiation process + * + * On the protocol level this means that no further counter-proposals will be generated by the Requestor + * + * @param receivedProposal The proposal from the provider + * @param reason User readable reason that should be presented to the Provider + */ + rejectProposal(receivedProposal: OfferProposal, reason: string): Promise; } diff --git a/src/market/draft-offer-proposal-pool.test.ts b/src/market/draft-offer-proposal-pool.test.ts index 95dcb3b85..cb6b476ad 100644 --- a/src/market/draft-offer-proposal-pool.test.ts +++ b/src/market/draft-offer-proposal-pool.test.ts @@ -1,16 +1,16 @@ import { DraftOfferProposalPool } from "./draft-offer-proposal-pool"; import { instance, mock, when } from "@johanblumenberg/ts-mockito"; -import { ProposalNew } from "./index"; +import { OfferProposal } from "./index"; describe("Draft Offer Proposal Pool", () => { // GIVEN - const mockProposal = mock(ProposalNew); + const mockProposal = mock(OfferProposal); // Most of the time we're testing the case when the Proposal is in `Draft` status when(mockProposal.isDraft()).thenReturn(true); // NOTE: ts-mockito instance + JS Set.add() doesn't play along, 2x instance(mockProposal) produces "the same" value for (Set.add) - const secondMockProposal = mock(ProposalNew); + const secondMockProposal = mock(OfferProposal); // Most of the time we're testing the case when the Proposal is in `Draft` status when(secondMockProposal.isDraft()).thenReturn(true); @@ -41,7 +41,7 @@ describe("Draft Offer Proposal Pool", () => { it("Will throw an error if the proposal is not in Draft state", async () => { const pool = new DraftOfferProposalPool(); - const proposalMock = mock(ProposalNew); + const proposalMock = mock(OfferProposal); when(proposalMock.isDraft()).thenReturn(false); expect(() => pool.add(instance(proposalMock))).toThrow("Cannot add a non-draft proposal to the pool"); diff --git a/src/market/draft-offer-proposal-pool.ts b/src/market/draft-offer-proposal-pool.ts index ab47ecc39..871c3993f 100644 --- a/src/market/draft-offer-proposal-pool.ts +++ b/src/market/draft-offer-proposal-pool.ts @@ -1,12 +1,12 @@ -import { ProposalNew } from "./proposal"; +import { OfferProposal } from "./offer-proposal"; import AsyncLock from "async-lock"; import { EventEmitter } from "eventemitter3"; import { GolemMarketError, MarketErrorCode } from "./error"; import { defaultLogger, Logger, sleep } from "../shared/utils"; import { Observable, Subscription } from "rxjs"; -export type ProposalSelector = (proposals: ProposalNew[]) => ProposalNew; -export type ProposalValidator = (proposal: ProposalNew) => boolean; +export type ProposalSelector = (proposals: OfferProposal[]) => OfferProposal; +export type ProposalValidator = (proposal: OfferProposal) => boolean; export interface ProposalPoolOptions { /** @@ -39,10 +39,10 @@ export interface ProposalPoolOptions { } export interface ProposalPoolEvents { - added: (proposal: ProposalNew) => void; - removed: (proposal: ProposalNew) => void; - acquired: (proposal: ProposalNew) => void; - released: (proposal: ProposalNew) => void; + added: (proposal: OfferProposal) => void; + removed: (proposal: OfferProposal) => void; + acquired: (proposal: OfferProposal) => void; + released: (proposal: OfferProposal) => void; cleared: () => void; } @@ -70,20 +70,20 @@ export class DraftOfferProposalPool { private readonly acquireTimeoutSec: number = 30; /** {@link ProposalPoolOptions.selectProposal} */ - private readonly selectProposal: ProposalSelector = (proposals: ProposalNew[]) => proposals[0]; + private readonly selectProposal: ProposalSelector = (proposals: OfferProposal[]) => proposals[0]; /** {@link ProposalPoolOptions.validateProposal} */ - private readonly validateProposal: ProposalValidator = (proposal: ProposalNew) => proposal !== undefined; + private readonly validateProposal: ProposalValidator = (proposal: OfferProposal) => proposal !== undefined; /** * The proposals that were not yet leased to anyone and are available for lease */ - private available = new Set(); + private available = new Set(); /** * The proposal that were already leased to someone and shouldn't be leased again */ - private leased = new Set(); + private leased = new Set(); public constructor(private options?: ProposalPoolOptions) { if (options?.selectProposal) { @@ -107,7 +107,7 @@ export class DraftOfferProposalPool { /** * Pushes the provided proposal to the list of proposals available for lease */ - public add(proposal: ProposalNew) { + public add(proposal: OfferProposal) { if (!proposal.isDraft()) { this.logger.error("Cannot add a non-draft proposal to the pool", { proposalId: proposal.id }); throw new GolemMarketError("Cannot add a non-draft proposal to the pool", MarketErrorCode.InvalidProposal); @@ -121,11 +121,11 @@ export class DraftOfferProposalPool { * * This method will reject if no suitable proposal will be found within {@link DraftOfferProposalPool.acquireTimeoutSec} seconds. */ - public acquire(): Promise { + public acquire(): Promise { return this.lock.acquire( "proposal-pool", async () => { - let proposal: ProposalNew | null = null; + let proposal: OfferProposal | null = null; while (proposal === null) { // Try to get one @@ -160,7 +160,7 @@ export class DraftOfferProposalPool { * Validates if the proposal is still usable before putting it back to the list of available ones * @param proposal */ - public release(proposal: ProposalNew): Promise { + public release(proposal: OfferProposal): Promise { return this.lock.acquire("proposal-pool", () => { this.leased.delete(proposal); @@ -173,7 +173,7 @@ export class DraftOfferProposalPool { }); } - public remove(proposal: ProposalNew): Promise { + public remove(proposal: OfferProposal): Promise { return this.lock.acquire("proposal-pool", () => { if (this.leased.has(proposal)) { this.leased.delete(proposal); @@ -236,12 +236,12 @@ export class DraftOfferProposalPool { }); } - protected removeFromAvailable(proposal: ProposalNew): void { + protected removeFromAvailable(proposal: OfferProposal): void { this.available.delete(proposal); this.events.emit("removed", proposal); } - public readFrom(source: Observable): Subscription { + public readFrom(source: Observable): Subscription { return source.subscribe({ next: (proposalBatch) => proposalBatch.forEach((proposal) => this.add(proposal)), error: (e) => this.logger.error("Error while collecting proposals", e), diff --git a/src/market/index.ts b/src/market/index.ts index c5118965d..0e596e066 100644 --- a/src/market/index.ts +++ b/src/market/index.ts @@ -1,6 +1,6 @@ -export { ProposalFilterNew } from "./proposal"; +export { ProposalFilterNew } from "./offer-proposal"; export { Demand, BasicDemandPropertyConfig, DemandSpecification } from "./demand"; -export { Proposal, ProposalNew, ProposalDTO } from "./proposal"; +export { OfferProposal, ProposalDTO } from "./offer-proposal"; export * as ProposalFilterFactory from "./strategy"; export { GolemMarketError, MarketErrorCode } from "./error"; export * as MarketHelpers from "./helpers"; diff --git a/src/market/market.module.test.ts b/src/market/market.module.test.ts index 53ce4dd65..3cb22c930 100644 --- a/src/market/market.module.test.ts +++ b/src/market/market.module.test.ts @@ -4,7 +4,7 @@ import { MarketModuleImpl } from "./market.module"; import * as YaTsClient from "ya-ts-client"; import { Demand, DemandSpecification, IDemandRepository } from "./demand"; import { from, of, take, takeUntil, timer } from "rxjs"; -import { IProposalRepository, ProposalNew, ProposalProperties } from "./proposal"; +import { IProposalRepository, OfferProposal, ProposalProperties } from "./offer-proposal"; import { MarketApiAdapter } from "../shared/yagna/"; import { IActivityApi, IPaymentApi } from "../agreement"; import { IAgreementApi } from "../agreement/agreement"; @@ -201,10 +201,16 @@ describe("Market module", () => { }); describe("subscribeForProposals()", () => { - it("should filter out rejected proposals", (done) => { + it("should filter out proposals that are invalid (in terms of content)", (done) => { const mockDemand = instance(imock()); const mockProposalDTO = imock(); + when(mockProposalDTO.issuerId).thenReturn("issuer-id"); + when(mockProposalDTO.properties).thenReturn({ + "golem.com.usage.vector": ["golem.usage.duration_sec", "golem.usage.cpu_sec"], + "golem.com.pricing.model.linear.coeffs": [0.1, 0.1], + }); + const mockProposalEventSuccess: YaTsClient.MarketApi.ProposalEventDTO = { eventType: "ProposalEvent", eventDate: "0000-00-00", @@ -273,7 +279,7 @@ describe("Market module", () => { }, properties: proposalProperties, getEstimatedCost: () => 1, - } as ProposalNew; + } as OfferProposal; const proposal2 = { isInitial: () => true, isDraft: () => false, @@ -286,7 +292,7 @@ describe("Market module", () => { }, properties: proposalProperties, getEstimatedCost: () => 1, - } as ProposalNew; + } as OfferProposal; const proposal3 = { isInitial: () => false, isDraft: () => true, @@ -299,7 +305,7 @@ describe("Market module", () => { }, properties: proposalProperties, getEstimatedCost: () => 1, - } as ProposalNew; + } as OfferProposal; const proposal4 = { isInitial: () => false, isDraft: () => true, @@ -312,7 +318,7 @@ describe("Market module", () => { }, properties: proposalProperties, getEstimatedCost: () => 1, - } as ProposalNew; + } as OfferProposal; marketModule.publishDemand = jest.fn().mockReturnValue(of({ id: "demand-id" })); marketModule.negotiateProposal = jest.fn(); @@ -320,7 +326,7 @@ describe("Market module", () => { .fn() .mockReturnValue(from([proposal1, proposal2, proposal3, proposal4])); - const draftProposals: ProposalNew[] = []; + const draftProposals: OfferProposal[] = []; marketModule .startCollectingProposals({ demandSpecification, @@ -370,7 +376,7 @@ describe("Market module", () => { }, properties: proposalProperties, getEstimatedCost: () => 99, - } as ProposalNew; + } as OfferProposal; const proposal2 = { isInitial: () => true, isDraft: () => false, @@ -383,7 +389,7 @@ describe("Market module", () => { }, properties: proposalProperties, getEstimatedCost: () => 1, - } as ProposalNew; + } as OfferProposal; const proposal3 = { isInitial: () => true, isDraft: () => false, @@ -396,7 +402,7 @@ describe("Market module", () => { }, properties: proposalProperties, getEstimatedCost: () => 1, - } as ProposalNew; + } as OfferProposal; const proposal4 = { isInitial: () => false, isDraft: () => true, @@ -409,7 +415,7 @@ describe("Market module", () => { }, properties: proposalProperties, getEstimatedCost: () => 1, - } as ProposalNew; + } as OfferProposal; marketModule.publishDemand = jest.fn().mockReturnValue(of({ id: "demand-id" })); marketModule.negotiateProposal = jest.fn(); @@ -417,7 +423,7 @@ describe("Market module", () => { .fn() .mockReturnValue(from([proposal1, proposal2, proposal3, proposal4])); - const draftProposals: ProposalNew[] = []; + const draftProposals: OfferProposal[] = []; marketModule .startCollectingProposals({ demandSpecification, diff --git a/src/market/market.module.ts b/src/market/market.module.ts index efd36180f..a2c2d2802 100644 --- a/src/market/market.module.ts +++ b/src/market/market.module.ts @@ -11,8 +11,8 @@ import { import { Agreement, AgreementPool, AgreementPoolOptions, IActivityApi, IPaymentApi, LeaseProcess } from "../agreement"; import { defaultLogger, Logger, YagnaApi } from "../shared/utils"; import { Allocation } from "../payment"; -import { bufferTime, filter, map, Observable, OperatorFunction, switchMap, tap } from "rxjs"; -import { IProposalRepository, ProposalFilterNew, ProposalNew } from "./proposal"; +import { bufferTime, catchError, filter, map, mergeMap, Observable, of, OperatorFunction, switchMap, tap } from "rxjs"; +import { IProposalRepository, OfferProposal, ProposalFilterNew } from "./offer-proposal"; import { DemandBodyBuilder } from "./demand/demand-body-builder"; import { IAgreementApi } from "../agreement/agreement"; import { BuildDemandOptions, DemandSpecification, IDemandRepository } from "./demand"; @@ -103,13 +103,13 @@ export interface MarketModule { * If an error occurs, the observable will emit an error and complete. * Keep in mind that since this method returns an observable, nothing will happen until you subscribe to it. */ - subscribeForProposals(demand: Demand): Observable; + subscribeForProposals(demand: Demand): Observable; /** * Sends a counter-offer to the provider. Note that to get the provider's response to your * counter you should listen to proposals sent to yagna using `subscribeForProposals`. */ - negotiateProposal(receivedProposal: ProposalNew, counterDemandSpec: DemandSpecification): Promise; + negotiateProposal(receivedProposal: OfferProposal, counterDemandSpec: DemandSpecification): Promise; /** * Internally @@ -122,7 +122,7 @@ export interface MarketModule { * * @return Returns when the provider accepts the agreement, rejects otherwise. The resulting agreement is ready to create activities from. */ - proposeAgreement(proposal: ProposalNew): Promise; + proposeAgreement(proposal: OfferProposal): Promise; /** * @return The Agreement that has been terminated via Yagna @@ -147,7 +147,7 @@ export interface MarketModule { demandSpecification: DemandSpecification; filter?: ProposalFilterNew; bufferSize?: number; - }): Observable; + }): Observable; createLease(agreement: Agreement, allocation: Allocation): LeaseProcess; @@ -299,20 +299,30 @@ export class MarketModuleImpl implements MarketModule { }); } - subscribeForProposals(demand: Demand): Observable { + subscribeForProposals(demand: Demand): Observable { return this.deps.marketApi.observeProposalEvents(demand).pipe( + tap((event) => this.logger.debug("Received proposal event from yagna", { event })), // filter out proposal rejection events filter((event) => !("reason" in event)), - // map proposal events to proposal models - map((event) => new ProposalNew((event as NewProposalEvent).proposal, demand)), + // try to map the events to proposal entity, but be ready for validation errors + mergeMap((event) => { + return of(event).pipe( + map((event) => new OfferProposal((event as NewProposalEvent).proposal, demand)), + // in case of a validation error return a null value from the pipe + catchError((err) => { + this.logger.error("Failed to map the yagna proposal event to Proposal entity", err); + return of(); + }), + ); + }), ); } - async negotiateProposal(receivedProposal: ProposalNew, offer: DemandSpecification): Promise { + async negotiateProposal(receivedProposal: OfferProposal, offer: DemandSpecification): Promise { return this.deps.marketApi.counterProposal(receivedProposal, offer); } - async proposeAgreement(proposal: ProposalNew): Promise { + async proposeAgreement(proposal: OfferProposal): Promise { const agreement = await this.agreementApi.proposeAgreement(proposal); this.logger.info("Proposed and got approval for agreement", { @@ -350,7 +360,7 @@ export class MarketModuleImpl implements MarketModule { bufferTimeout?: number; minProposalsBatchSize?: number; proposalsBatchReleaseTimeoutMs?: number; - }): Observable { + }): Observable { return this.publishDemand(options.demandSpecification).pipe( // for each demand created -> start collecting all proposals switchMap((demand) => { @@ -406,7 +416,7 @@ export class MarketModuleImpl implements MarketModule { private reduceInitialProposalsByProviderKey(options?: { minProposalsBatchSize?: number; proposalsBatchReleaseTimeoutMs?: number; - }): OperatorFunction { + }): OperatorFunction { return (source) => new Observable((destination) => { let isCancelled = false; diff --git a/src/market/offer-proposal.ts b/src/market/offer-proposal.ts new file mode 100644 index 000000000..2691d4ae3 --- /dev/null +++ b/src/market/offer-proposal.ts @@ -0,0 +1,227 @@ +import { MarketApi } from "ya-ts-client"; +import { GolemMarketError, MarketErrorCode } from "./error"; +import { ProviderInfo } from "../agreement"; +import { Demand } from "./demand"; + +export type ProposalFilterNew = (proposal: OfferProposal) => boolean; + +export type PricingInfo = { + cpuSec: number; + envSec: number; + start: number; +}; + +// eslint-disable-next-line @typescript-eslint/naming-convention +export type ProposalProperties = Record & { + "golem.activity.caps.transfer.protocol": string[]; + "golem.com.payment.debit-notes.accept-timeout?": number; + "golem.com.payment.platform.erc20-polygon-glm.address"?: string; + "golem.com.payment.platform.erc20-holesky-tglm.address"?: string; + "golem.com.payment.platform.erc20-mumbai-tglm.address"?: string; + "golem.com.pricing.model": "linear"; + "golem.com.pricing.model.linear.coeffs": number[]; + "golem.com.scheme": string; + "golem.com.scheme.payu.debit-note.interval-sec?"?: number; + "golem.com.scheme.payu.payment-timeout-sec?"?: number; + "golem.com.usage.vector": string[]; + "golem.inf.cpu.architecture": string; + "golem.inf.cpu.brand": string; + "golem.inf.cpu.capabilities": string[]; + "golem.inf.cpu.cores": number; + "golem.inf.cpu.model": string; + "golem.inf.cpu.threads": number; + "golem.inf.cpu.vendor": string[]; + "golem.inf.mem.gib": number; + "golem.inf.storage.gib": number; + "golem.node.debug.subnet": string; + "golem.node.id.name": string; + "golem.node.net.is-public": boolean; + "golem.runtime.capabilities": string[]; + "golem.runtime.name": string; + "golem.runtime.version": string; + "golem.srv.caps.multi-activity": boolean; + "golem.srv.caps.payload-manifest": boolean; +}; + +export interface ProposalDTO { + transferProtocol: string[]; + cpuBrand: string; + cpuCapabilities: string[]; + cpuCores: number; + cpuThreads: number; + memory: number; + storage: number; + publicNet: boolean; + runtimeCapabilities: string[]; + runtimeName: string; + state: MarketApi.ProposalDTO["state"]; +} + +export interface IProposalRepository { + add(proposal: OfferProposal): OfferProposal; + getById(id: string): OfferProposal | undefined; + getByDemandAndId(demand: Demand, id: string): Promise; +} + +/** + * Entity representing the offer presented by the Provider to the Requestor + * + * Issue: The final proposal that gets promoted to an agreement comes from the provider + * Right now the last time I can acces it directly is when I receive the counter from the provider, + * later it's impossible for me to get it via the API `{"message":"Path deserialize error: Id [2cb0b2820c6142fab5af7a8e90da09f0] has invalid owner type."}` + * + * FIXME #yagna should allow obtaining proposals via the API even if I'm not the owner! + */ +export class OfferProposal { + public readonly id: string; + public provider: ProviderInfo; + public readonly previousProposalId: string | null = null; + + constructor( + public readonly model: MarketApi.ProposalDTO, + public readonly demand: Demand, + ) { + this.id = model.proposalId; + this.provider = this.getProviderInfo(); + this.previousProposalId = model.prevProposalId ?? null; + + this.validate(); + } + + /** + * Validates if the proposal satisfies basic business rules, is complete and thus safe to interact with + * + * Use this method before executing any important logic, to ensure that you're working with correct, complete data + */ + protected validate(): void | never { + const usageVector = this.properties["golem.com.usage.vector"]; + const priceVector = this.properties["golem.com.pricing.model.linear.coeffs"]; + + if (!usageVector || usageVector.length === 0) { + throw new GolemMarketError( + "Broken proposal: the `golem.com.usage.vector` does not contain price information", + MarketErrorCode.InvalidProposal, + ); + } + + if (!priceVector || priceVector.length === 0) { + throw new GolemMarketError( + "Broken proposal: the `golem.com.pricing.model.linear.coeffs` does not contain pricing information", + MarketErrorCode.InvalidProposal, + ); + } + + if (usageVector.length < priceVector.length - 1) { + throw new GolemMarketError( + "Broken proposal: the `golem.com.usage.vector` has less pricing information than `golem.com.pricing.model.linear.coeffs`", + MarketErrorCode.InvalidProposal, + ); + } + + if (priceVector.length < usageVector.length) { + throw new GolemMarketError( + "Broken proposal: the `golem.com.pricing.model.linear.coeffs` should contain 3 price values", + MarketErrorCode.InvalidProposal, + ); + } + } + + isInitial(): boolean { + return this.model.state === "Initial"; + } + + isDraft(): boolean { + return this.model.state === "Draft"; + } + + isExpired(): boolean { + return this.model.state === "Expired"; + } + + isRejected(): boolean { + return this.model.state === "Rejected"; + } + + public get properties(): ProposalProperties { + return this.model.properties as ProposalProperties; + } + + public get state(): MarketApi.ProposalDTO["state"] { + return this.model.state; + } + + public get timestamp(): string { + return this.model.timestamp; + } + + getDto(): ProposalDTO { + return { + transferProtocol: this.properties["golem.activity.caps.transfer.protocol"], + cpuBrand: this.properties["golem.inf.cpu.brand"], + cpuCapabilities: this.properties["golem.inf.cpu.capabilities"], + cpuCores: this.properties["golem.inf.cpu.cores"], + cpuThreads: this.properties["golem.inf.cpu.threads"], + memory: this.properties["golem.inf.mem.gib"], + storage: this.properties["golem.inf.storage.gib"], + publicNet: this.properties["golem.node.net.is-public"], + runtimeCapabilities: this.properties["golem.runtime.capabilities"], + runtimeName: this.properties["golem.runtime.name"], + state: this.state, + }; + } + + get pricing(): PricingInfo { + const usageVector = this.properties["golem.com.usage.vector"]; + const priceVector = this.properties["golem.com.pricing.model.linear.coeffs"]; + + const envIdx = usageVector.findIndex((ele) => ele === "golem.usage.duration_sec"); + const cpuIdx = usageVector.findIndex((ele) => ele === "golem.usage.cpu_sec"); + + const envSec = priceVector[envIdx] ?? 0.0; + const cpuSec = priceVector[cpuIdx] ?? 0.0; + const start = priceVector[priceVector.length - 1]; + + return { + cpuSec, + envSec, + start, + }; + } + + /** + * Proposal cost estimation based on CPU, Env and startup costs + */ + getEstimatedCost(): number { + const threadsNo = this.properties["golem.inf.cpu.threads"] || 1; + return this.pricing.start + this.pricing.cpuSec * threadsNo + this.pricing.envSec; + } + + public isValid(): boolean { + try { + this.validate(); + return true; + } catch (err) { + return false; + } + } + + public getProviderInfo(): ProviderInfo { + return { + id: this.model.issuerId, + name: this.properties["golem.node.id.name"], + walletAddress: this.properties[`golem.com.payment.platform.${this.demand.paymentPlatform}.address`] as string, + }; + } + + hasPaymentPlatform(paymentPlatform: string): boolean { + return this.getProviderPaymentPlatforms().includes(paymentPlatform); + } + + private getProviderPaymentPlatforms(): string[] { + return ( + Object.keys(this.properties) + .filter((prop) => prop.startsWith("golem.com.payment.platform.")) + .map((prop) => prop.split(".")[4]) || [] + ); + } +} diff --git a/src/market/proposal.test.ts b/src/market/proposal.test.ts index 8994dad89..5f3abc348 100644 --- a/src/market/proposal.test.ts +++ b/src/market/proposal.test.ts @@ -1,5 +1,5 @@ import { MarketApi } from "ya-ts-client"; -import { Proposal, ProposalProperties } from "./proposal"; +import { OfferProposal, ProposalProperties } from "./offer-proposal"; import { Demand } from "./demand"; import { instance, mock, reset, when } from "@johanblumenberg/ts-mockito"; import { Allocation } from "../payment"; @@ -7,11 +7,10 @@ import { GolemMarketError, MarketErrorCode } from "./error"; const allocationMock = mock(Allocation); const demandMock = mock(Demand); -const mockApi = mock(MarketApi.RequestorService); const testDemand = instance(demandMock); -const buildTestProposal = (props: Partial): Proposal => { +const buildTestProposal = (props: Partial): OfferProposal => { const model: MarketApi.ProposalDTO = { constraints: "", issuerId: "", @@ -21,10 +20,10 @@ const buildTestProposal = (props: Partial): Proposal => { properties: props, }; - return new Proposal(testDemand, null, jest.fn(), instance(mockApi), model); + return new OfferProposal(model, testDemand); }; -describe.skip("DEPRECATED Proposal", () => { +describe("Proposal", () => { beforeEach(() => { reset(allocationMock); reset(demandMock); diff --git a/src/market/proposal.ts b/src/market/proposal.ts deleted file mode 100644 index 169e67d6e..000000000 --- a/src/market/proposal.ts +++ /dev/null @@ -1,440 +0,0 @@ -import { MarketApi } from "ya-ts-client"; -import { GolemMarketError, MarketErrorCode } from "./error"; -import { ProviderInfo } from "../agreement"; -import { Demand } from "./demand"; -import { withTimeout } from "../shared/utils/timeout"; -import { EventEmitter } from "eventemitter3"; -import { DemandBodyPrototype, DemandPropertyValue } from "./demand/demand-body-builder"; -import { DemandRequestBody } from "../shared/yagna"; - -export type ProposalFilterNew = (proposal: ProposalNew) => boolean; - -export interface ProposalEvents { - proposalResponded: (details: { id: string; provider: ProviderInfo; counteringProposalId: string }) => void; - proposalRejected: (details: { id: string; provider: ProviderInfo; parentId: string | null; reason: string }) => void; - proposalFailed: (details: { id: string; provider: ProviderInfo; parentId: string | null; reason: string }) => void; -} - -export type PricingInfo = { - cpuSec: number; - envSec: number; - start: number; -}; - -// eslint-disable-next-line @typescript-eslint/naming-convention -export type ProposalProperties = Record & { - "golem.activity.caps.transfer.protocol": string[]; - "golem.com.payment.debit-notes.accept-timeout?": number; - "golem.com.payment.platform.erc20-polygon-glm.address"?: string; - "golem.com.payment.platform.erc20-holesky-tglm.address"?: string; - "golem.com.payment.platform.erc20-mumbai-tglm.address"?: string; - "golem.com.pricing.model": "linear"; - "golem.com.pricing.model.linear.coeffs": number[]; - "golem.com.scheme": string; - "golem.com.scheme.payu.debit-note.interval-sec?"?: number; - "golem.com.scheme.payu.payment-timeout-sec?"?: number; - "golem.com.usage.vector": string[]; - "golem.inf.cpu.architecture": string; - "golem.inf.cpu.brand": string; - "golem.inf.cpu.capabilities": string[]; - "golem.inf.cpu.cores": number; - "golem.inf.cpu.model": string; - "golem.inf.cpu.threads": number; - "golem.inf.cpu.vendor": string[]; - "golem.inf.mem.gib": number; - "golem.inf.storage.gib": number; - "golem.node.debug.subnet": string; - "golem.node.id.name": string; - "golem.node.net.is-public": boolean; - "golem.runtime.capabilities": string[]; - "golem.runtime.name": string; - "golem.runtime.version": string; - "golem.srv.caps.multi-activity": boolean; - "golem.srv.caps.payload-manifest": boolean; -}; - -export interface ProposalDTO { - transferProtocol: string[]; - cpuBrand: string; - cpuCapabilities: string[]; - cpuCores: number; - cpuThreads: number; - memory: number; - storage: number; - publicNet: boolean; - runtimeCapabilities: string[]; - runtimeName: string; - state: MarketApi.ProposalDTO["state"]; -} - -export interface IProposalRepository { - add(proposal: ProposalNew): ProposalNew; - getById(id: string): ProposalNew | undefined; - getByDemandAndId(demand: Demand, id: string): Promise; -} - -/** - * Issue: The final proposal that gets promoted to an agreement comes from the provider - * Right now the last time I can acces it directly is when I receive the counter from the provider, - * later it's impossible for me to get it via the API `{"message":"Path deserialize error: Id [2cb0b2820c6142fab5af7a8e90da09f0] has invalid owner type."}` - * - * FIXME #yagna should allow obtaining proposals via the API even if I'm not the owner! - */ -export class ProposalNew { - public readonly id: string; - public provider: ProviderInfo; - public readonly previousProposalId: string | null = null; - - constructor( - public readonly model: MarketApi.ProposalDTO, - public readonly demand: Demand, - ) { - this.id = model.proposalId; - this.provider = this.getProviderInfo(); - this.previousProposalId = model.prevProposalId ?? null; - } - - isInitial(): boolean { - return this.model.state === "Initial"; - } - - isDraft(): boolean { - return this.model.state === "Draft"; - } - - isExpired(): boolean { - return this.model.state === "Expired"; - } - - isRejected(): boolean { - return this.model.state === "Rejected"; - } - - public get properties(): ProposalProperties { - return this.model.properties as ProposalProperties; - } - - public get state(): MarketApi.ProposalDTO["state"] { - return this.model.state; - } - - public get timestamp(): string { - return this.model.timestamp; - } - - getDto(): ProposalDTO { - return { - transferProtocol: this.properties["golem.activity.caps.transfer.protocol"], - cpuBrand: this.properties["golem.inf.cpu.brand"], - cpuCapabilities: this.properties["golem.inf.cpu.capabilities"], - cpuCores: this.properties["golem.inf.cpu.cores"], - cpuThreads: this.properties["golem.inf.cpu.threads"], - memory: this.properties["golem.inf.mem.gib"], - storage: this.properties["golem.inf.storage.gib"], - publicNet: this.properties["golem.node.net.is-public"], - runtimeCapabilities: this.properties["golem.runtime.capabilities"], - runtimeName: this.properties["golem.runtime.name"], - state: this.state, - }; - } - - get pricing(): PricingInfo { - const usageVector = this.properties["golem.com.usage.vector"]; - const priceVector = this.properties["golem.com.pricing.model.linear.coeffs"]; - - const envIdx = usageVector.findIndex((ele) => ele === "golem.usage.duration_sec"); - const cpuIdx = usageVector.findIndex((ele) => ele === "golem.usage.cpu_sec"); - - const envSec = priceVector[envIdx] ?? 0.0; - const cpuSec = priceVector[cpuIdx] ?? 0.0; - const start = priceVector[priceVector.length - 1]; - - return { - cpuSec, - envSec, - start, - }; - } - - /** - * Proposal cost estimation based on CPU, Env and startup costs - */ - getEstimatedCost(): number { - const threadsNo = this.properties["golem.inf.cpu.threads"] || 1; - return this.pricing.start + this.pricing.cpuSec * threadsNo + this.pricing.envSec; - } - - public isValid(): boolean { - const usageVector = this.properties["golem.com.usage.vector"]; - const priceVector = this.properties["golem.com.pricing.model.linear.coeffs"]; - - if (!usageVector || usageVector.length === 0) { - return false; - } - - if (!priceVector || priceVector.length === 0) { - return false; - } - - if (usageVector.length < priceVector.length - 1) { - return false; - } - - if (priceVector.length < usageVector.length) { - return false; - } - - return true; - } - - public getProviderInfo(): ProviderInfo { - return { - id: this.model.issuerId, - name: this.properties["golem.node.id.name"], - walletAddress: this.properties[`golem.com.payment.platform.${this.demand.paymentPlatform}.address`] as string, - }; - } -} - -/** - * Proposal module - an object representing an offer in the state of a proposal from the provider. - * @deprecated - */ -export class Proposal { - id: string; - readonly issuerId: string; - readonly provider: ProviderInfo; - readonly properties: ProposalProperties; - readonly constraints: string; - readonly timestamp: string; - counteringProposalId: string | null; - private readonly state: MarketApi.ProposalDTO["state"]; - private readonly prevProposalId: string | undefined; - public readonly events = new EventEmitter(); - - /** - * Create proposal for given subscription ID - * - * @param subscription - * @param parentId - * @param setCounteringProposalReference - * @param api - * @param model - */ - constructor( - public readonly subscription: Demand, - private readonly parentId: string | null, - private readonly setCounteringProposalReference: (id: string, parentId: string) => void | null, - private readonly api: MarketApi.RequestorService, - public readonly model: MarketApi.ProposalDTO, - ) { - this.id = model.proposalId; - this.issuerId = model.issuerId; - this.properties = model.properties as ProposalProperties; - this.constraints = model.constraints; - this.state = model.state; - this.prevProposalId = model.prevProposalId; - this.timestamp = model.timestamp; - this.counteringProposalId = null; - this.provider = this.getProviderInfo(); - - // Run validation to ensure that the Proposal is in a complete and correct state - this.validate(); - } - - getDto(): ProposalDTO { - return { - transferProtocol: this.properties["golem.activity.caps.transfer.protocol"], - cpuBrand: this.properties["golem.inf.cpu.brand"], - cpuCapabilities: this.properties["golem.inf.cpu.capabilities"], - cpuCores: this.properties["golem.inf.cpu.cores"], - cpuThreads: this.properties["golem.inf.cpu.threads"], - memory: this.properties["golem.inf.mem.gib"], - storage: this.properties["golem.inf.storage.gib"], - publicNet: this.properties["golem.node.net.is-public"], - runtimeCapabilities: this.properties["golem.runtime.capabilities"], - runtimeName: this.properties["golem.runtime.name"], - state: this.state, - }; - } - - get pricing(): PricingInfo { - const usageVector = this.properties["golem.com.usage.vector"]; - const priceVector = this.properties["golem.com.pricing.model.linear.coeffs"]; - - const envIdx = usageVector.findIndex((ele) => ele === "golem.usage.duration_sec"); - const cpuIdx = usageVector.findIndex((ele) => ele === "golem.usage.cpu_sec"); - - const envSec = priceVector[envIdx] ?? 0.0; - const cpuSec = priceVector[cpuIdx] ?? 0.0; - const start = priceVector[priceVector.length - 1]; - - return { - cpuSec, - envSec, - start, - }; - } - - /** - * Validates if the proposal satisfies basic business rules, is complete and thus safe to interact with - * - * Use this method before executing any important logic, to ensure that you're working with correct, complete data - */ - protected validate(): void | never { - const usageVector = this.properties["golem.com.usage.vector"]; - const priceVector = this.properties["golem.com.pricing.model.linear.coeffs"]; - - if (!usageVector || usageVector.length === 0) { - throw new GolemMarketError( - "Broken proposal: the `golem.com.usage.vector` does not contain price information", - MarketErrorCode.InvalidProposal, - ); - } - - if (!priceVector || priceVector.length === 0) { - throw new GolemMarketError( - "Broken proposal: the `golem.com.pricing.model.linear.coeffs` does not contain pricing information", - MarketErrorCode.InvalidProposal, - ); - } - - if (usageVector.length < priceVector.length - 1) { - throw new GolemMarketError( - "Broken proposal: the `golem.com.usage.vector` has less pricing information than `golem.com.pricing.model.linear.coeffs`", - MarketErrorCode.InvalidProposal, - ); - } - - if (priceVector.length < usageVector.length) { - throw new GolemMarketError( - "Broken proposal: the `golem.com.pricing.model.linear.coeffs` should contain 3 price values", - MarketErrorCode.InvalidProposal, - ); - } - } - - isInitial(): boolean { - return this.state === "Initial"; - } - - isDraft(): boolean { - return this.state === "Draft"; - } - - isExpired(): boolean { - return this.state === "Expired"; - } - - isRejected(): boolean { - return this.state === "Rejected"; - } - - async reject(reason = "no reason") { - try { - // eslint-disable-next-line @typescript-eslint/ban-types - await this.api.rejectProposalOffer(this.subscription.id, this.id, { message: reason as {} }); - this.events.emit("proposalRejected", { - id: this.id, - provider: this.provider, - parentId: this.id, - reason, - }); - } catch (error) { - throw new GolemMarketError( - `Failed to reject proposal. ${error?.response?.data?.message || error}`, - MarketErrorCode.ProposalRejectionFailed, - error, - ); - } - } - - async respond(chosenPlatform: string) { - try { - this.buildDemandRequestBody(this.subscription.details.prototype).properties["golem.com.payment.chosen-platform"] = - chosenPlatform; - - const counteringProposalId = await withTimeout( - this.api.counterProposalDemand( - this.subscription.id, - this.id, - this.buildDemandRequestBody(this.subscription.details.prototype), - ), - 20_000, - ); - - if (!counteringProposalId || typeof counteringProposalId !== "string") { - throw new GolemMarketError( - "Failed to respond proposal. No countering proposal ID returned", - MarketErrorCode.ProposalResponseFailed, - ); - } - if (this.setCounteringProposalReference) { - this.setCounteringProposalReference(this.id, counteringProposalId); - } - this.events.emit("proposalResponded", { - id: this.id, - provider: this.provider, - counteringProposalId, - }); - return counteringProposalId; - } catch (error) { - const reason = error?.response?.data?.message || error.toString(); - this.events.emit("proposalFailed", { - id: this.id, - provider: this.provider, - parentId: this.id, - reason, - }); - throw new GolemMarketError( - `Failed to respond proposal. ${reason}`, - MarketErrorCode.ProposalResponseFailed, - error, - ); - } - } - - hasPaymentPlatform(paymentPlatform: string): boolean { - return this.getProviderPaymentPlatforms().includes(paymentPlatform); - } - - /** - * Proposal cost estimation based on CPU, Env and startup costs - */ - getEstimatedCost(): number { - const threadsNo = this.properties["golem.inf.cpu.threads"] || 1; - return this.pricing.start + this.pricing.cpuSec * threadsNo + this.pricing.envSec; - } - - private getProviderPaymentPlatforms(): string[] { - return ( - Object.keys(this.properties) - .filter((prop) => prop.startsWith("golem.com.payment.platform.")) - .map((prop) => prop.split(".")[4]) || [] - ); - } - - private getProviderInfo(): ProviderInfo { - return { - id: this.issuerId, - name: this.properties["golem.node.id.name"], - walletAddress: this.properties[ - `golem.com.payment.platform.${this.subscription.paymentPlatform}.address` - ] as string, - }; - } - - /** @deprecated Glue code for migration purposes */ - private buildDemandRequestBody(decorations: DemandBodyPrototype): DemandRequestBody { - let constraints: string; - - if (!decorations.constraints.length) constraints = "(&)"; - else if (decorations.constraints.length == 1) constraints = decorations.constraints[0]; - else constraints = `(&${decorations.constraints.join("\n\t")})`; - - const properties: Record = {}; - decorations.properties.forEach((prop) => (properties[prop.key] = prop.value)); - - return { constraints, properties }; - } -} diff --git a/src/market/proposals_batch.test.ts b/src/market/proposals_batch.test.ts index 0906b5dc8..90a89d081 100644 --- a/src/market/proposals_batch.test.ts +++ b/src/market/proposals_batch.test.ts @@ -1,6 +1,6 @@ import { ProposalsBatch } from "./proposals_batch"; import { mock, instance, when } from "@johanblumenberg/ts-mockito"; -import { ProposalNew, ProposalProperties } from "./proposal"; +import { OfferProposal, ProposalProperties } from "./offer-proposal"; import { ProviderInfo } from "../agreement"; const mockedProviderInfo: ProviderInfo = { @@ -13,7 +13,7 @@ describe("ProposalsBatch", () => { describe("Adding Proposals", () => { it("should add the proposal to the batch from new provider", async () => { const proposalsBatch = new ProposalsBatch({ minBatchSize: 1 }); - const mockedProposal = mock(ProposalNew); + const mockedProposal = mock(OfferProposal); when(mockedProposal.provider).thenReturn(mockedProviderInfo); when(mockedProposal.properties).thenReturn({ ["golem.inf.cpu.cores"]: 1, @@ -28,7 +28,7 @@ describe("ProposalsBatch", () => { }); it("should not add the proposal to the batch from the existing provider and the same hardware configuration", async () => { const proposalsBatch = new ProposalsBatch({ releaseTimeoutMs: 100 }); - const mockedProposal = mock(ProposalNew); + const mockedProposal = mock(OfferProposal); when(mockedProposal.provider).thenReturn(mockedProviderInfo); when(mockedProposal.properties).thenReturn({ ["golem.inf.cpu.cores"]: 1, @@ -46,7 +46,7 @@ describe("ProposalsBatch", () => { it("should add the proposal to the batch from the existing provider and different hardware configuration", async () => { const proposalsBatch = new ProposalsBatch({ minBatchSize: 2 }); - const mockedProposal1 = mock(ProposalNew); + const mockedProposal1 = mock(OfferProposal); when(mockedProposal1.provider).thenReturn(mockedProviderInfo); when(mockedProposal1.properties).thenReturn({ ["golem.inf.cpu.cores"]: 1, @@ -54,7 +54,7 @@ describe("ProposalsBatch", () => { ["golem.inf.mem.gib"]: 1, ["golem.inf.storage.gib"]: 1, } as ProposalProperties); - const mockedProposal2 = mock(ProposalNew); + const mockedProposal2 = mock(OfferProposal); when(mockedProposal2.provider).thenReturn(mockedProviderInfo); when(mockedProposal2.properties).thenReturn({ ["golem.inf.cpu.cores"]: 77, @@ -76,7 +76,7 @@ describe("ProposalsBatch", () => { describe("Reading Proposals", () => { it("should read the set of proposals grouped by provider key distinguished by provider id, cpu, threads, memory and storage", async () => { const proposalsBatch = new ProposalsBatch({ releaseTimeoutMs: 100 }); - const mockedProposal1 = mock(ProposalNew); + const mockedProposal1 = mock(OfferProposal); when(mockedProposal1.provider).thenReturn(mockedProviderInfo); when(mockedProposal1.properties).thenReturn({ ["golem.inf.cpu.cores"]: 1, @@ -84,7 +84,7 @@ describe("ProposalsBatch", () => { ["golem.inf.mem.gib"]: 1, ["golem.inf.storage.gib"]: 1, } as ProposalProperties); - const mockedProposal2 = mock(ProposalNew); + const mockedProposal2 = mock(OfferProposal); when(mockedProposal2.provider).thenReturn(mockedProviderInfo); when(mockedProposal2.properties).thenReturn({ ["golem.inf.cpu.cores"]: 1, @@ -92,7 +92,7 @@ describe("ProposalsBatch", () => { ["golem.inf.mem.gib"]: 1, ["golem.inf.storage.gib"]: 1, } as ProposalProperties); - const mockedProposal3 = mock(ProposalNew); + const mockedProposal3 = mock(OfferProposal); when(mockedProposal3.provider).thenReturn(mockedProviderInfo); when(mockedProposal3.properties).thenReturn({ ["golem.inf.cpu.cores"]: 1, @@ -100,7 +100,7 @@ describe("ProposalsBatch", () => { ["golem.inf.mem.gib"]: 1, ["golem.inf.storage.gib"]: 1, } as ProposalProperties); - const mockedProposal4 = mock(ProposalNew); + const mockedProposal4 = mock(OfferProposal); when(mockedProposal4.provider).thenReturn(mockedProviderInfo); when(mockedProposal4.properties).thenReturn({ ["golem.inf.cpu.cores"]: 1, @@ -108,7 +108,7 @@ describe("ProposalsBatch", () => { ["golem.inf.mem.gib"]: 77, ["golem.inf.storage.gib"]: 1, } as ProposalProperties); - const mockedProposal5 = mock(ProposalNew); + const mockedProposal5 = mock(OfferProposal); when(mockedProposal5.provider).thenReturn(mockedProviderInfo); when(mockedProposal5.properties).thenReturn({ ["golem.inf.cpu.cores"]: 1, @@ -116,7 +116,7 @@ describe("ProposalsBatch", () => { ["golem.inf.mem.gib"]: 1, ["golem.inf.storage.gib"]: 77, } as ProposalProperties); - const mockedProposal6 = mock(ProposalNew); + const mockedProposal6 = mock(OfferProposal); when(mockedProposal6.provider).thenReturn({ id: "provider-77" } as ProviderInfo); when(mockedProposal6.properties).thenReturn({ ["golem.inf.cpu.cores"]: 1, @@ -150,7 +150,7 @@ describe("ProposalsBatch", () => { }); it("should read the set of proposal grouped by provider key and reduced proposals from teh same provider to the lowest price and highest time", async () => { const proposalsBatch = new ProposalsBatch({ releaseTimeoutMs: 100 }); - const mockedProposal1 = mock(ProposalNew); + const mockedProposal1 = mock(OfferProposal); when(mockedProposal1.provider).thenReturn(mockedProviderInfo); when(mockedProposal1.properties).thenReturn({ ["golem.inf.cpu.cores"]: 1, @@ -164,7 +164,7 @@ describe("ProposalsBatch", () => { start: 1, }); when(mockedProposal1.timestamp).thenReturn("2024-01-01T00:00:00.000Z"); - const mockedProposal2 = mock(ProposalNew); + const mockedProposal2 = mock(OfferProposal); when(mockedProposal2.provider).thenReturn(mockedProviderInfo); when(mockedProposal2.properties).thenReturn({ ["golem.inf.cpu.cores"]: 1, @@ -178,7 +178,7 @@ describe("ProposalsBatch", () => { start: 1, }); when(mockedProposal2.timestamp).thenReturn("2024-01-01T07:07:07.007Z"); - const mockedProposal3 = mock(ProposalNew); + const mockedProposal3 = mock(OfferProposal); when(mockedProposal3.provider).thenReturn(mockedProviderInfo); when(mockedProposal3.properties).thenReturn({ ["golem.inf.cpu.cores"]: 1, @@ -208,7 +208,7 @@ describe("ProposalsBatch", () => { }); it("should drain batch after reading proposals", async () => { const proposalsBatch = new ProposalsBatch({ releaseTimeoutMs: 100 }); - const mockedProposal = mock(ProposalNew); + const mockedProposal = mock(OfferProposal); when(mockedProposal.provider).thenReturn(mockedProviderInfo); when(mockedProposal.properties).thenReturn({ ["golem.inf.cpu.cores"]: 1, diff --git a/src/market/proposals_batch.ts b/src/market/proposals_batch.ts index 2669b0185..ca5193bf1 100644 --- a/src/market/proposals_batch.ts +++ b/src/market/proposals_batch.ts @@ -1,4 +1,4 @@ -import { ProposalNew } from "./proposal"; +import { OfferProposal } from "./offer-proposal"; import AsyncLock from "async-lock"; export type ProposalsBatchOptions = { @@ -19,7 +19,7 @@ const DEFAULTS = { */ export class ProposalsBatch { /** Batch of proposals mapped by provider key and related set of initial proposals */ - private batch = new Map>(); + private batch = new Map>(); /** Lock used to synchronize adding and getting proposals from the batch */ private lock: AsyncLock = new AsyncLock(); private config: Required; @@ -35,12 +35,12 @@ export class ProposalsBatch { * Add proposal to the batch grouped by provider key * which consist of providerId, cores, threads, mem and storage */ - async addProposal(proposal: ProposalNew) { + async addProposal(proposal: OfferProposal) { const providerKey = this.getProviderKey(proposal); await this.lock.acquire("proposals-batch", () => { let proposals = this.batch.get(providerKey); if (!proposals) { - proposals = new Set(); + proposals = new Set(); this.batch.set(providerKey, proposals); } proposals.add(proposal); @@ -51,7 +51,7 @@ export class ProposalsBatch { * Returns the batched proposals from the internal buffer and empties it */ public async getProposals() { - const proposals: ProposalNew[] = []; + const proposals: OfferProposal[] = []; await this.lock.acquire("proposals-batch", () => { this.batch.forEach((providersProposals) => proposals.push(this.getBestProposal(providersProposals))); @@ -90,8 +90,8 @@ export class ProposalsBatch { /** * Selects the best proposal from the set according to the lowest price and the youngest proposal age */ - private getBestProposal(proposals: Set): ProposalNew { - const sortByLowerPriceAndHigherTime = (p1: ProposalNew, p2: ProposalNew) => { + private getBestProposal(proposals: Set): OfferProposal { + const sortByLowerPriceAndHigherTime = (p1: OfferProposal, p2: OfferProposal) => { const p1Price = p1.getEstimatedCost(); const p2Price = p2.getEstimatedCost(); const p1Time = new Date(p1.timestamp).valueOf(); @@ -104,7 +104,7 @@ export class ProposalsBatch { /** * Provider key used to group proposals so that they can be distinguished based on ID and hardware configuration */ - private getProviderKey(proposal: ProposalNew): string { + private getProviderKey(proposal: OfferProposal): string { return [ proposal.provider.id, proposal.properties["golem.inf.cpu.cores"], diff --git a/src/market/strategy.test.ts b/src/market/strategy.test.ts index 32f946e5b..a4d894a51 100644 --- a/src/market/strategy.test.ts +++ b/src/market/strategy.test.ts @@ -1,5 +1,5 @@ import { instance, mock, reset, when } from "@johanblumenberg/ts-mockito"; -import { Proposal } from "./proposal"; +import { OfferProposal } from "./offer-proposal"; import { acceptAll, allowProvidersById, @@ -8,7 +8,7 @@ import { disallowProvidersByNameRegex, } from "./strategy"; -const mockProposal = mock(Proposal); +const mockProposal = mock(OfferProposal); describe("SDK provided proposal filters", () => { beforeEach(() => { @@ -40,8 +40,8 @@ describe("SDK provided proposal filters", () => { describe("disallowProvidersById", () => { test("Accepts only the providers with the name not listed on the blacklist", () => { - const p1 = mock(Proposal); - const p2 = mock(Proposal); + const p1 = mock(OfferProposal); + const p2 = mock(OfferProposal); when(p1.provider).thenReturn({ id: "provider-1", @@ -64,8 +64,8 @@ describe("SDK provided proposal filters", () => { describe("disallowProvidersByNameRegex", () => { test("Accepts only the providers which name doesn't match the specified regex", () => { - const p1 = mock(Proposal); - const p2 = mock(Proposal); + const p1 = mock(OfferProposal); + const p2 = mock(OfferProposal); when(p1.provider).thenReturn({ id: "provider-1", @@ -88,8 +88,8 @@ describe("SDK provided proposal filters", () => { describe("allowProvidersById", () => { test("Accepts only the providers who's ID's are on the list", () => { - const p1 = mock(Proposal); - const p2 = mock(Proposal); + const p1 = mock(OfferProposal); + const p2 = mock(OfferProposal); when(p1.provider).thenReturn({ id: "provider-1", @@ -111,8 +111,8 @@ describe("SDK provided proposal filters", () => { describe("allowProvidersByNameRegex", () => { test("Accepts only the providers who's names match the provided regex", () => { - const p1 = mock(Proposal); - const p2 = mock(Proposal); + const p1 = mock(OfferProposal); + const p2 = mock(OfferProposal); when(p1.provider).thenReturn({ id: "provider-1", diff --git a/src/market/strategy.ts b/src/market/strategy.ts index 980ebe324..b15e7e252 100644 --- a/src/market/strategy.ts +++ b/src/market/strategy.ts @@ -1,30 +1,30 @@ -import { Proposal } from "./proposal"; +import { OfferProposal } from "./offer-proposal"; /** Default Proposal filter that accept all proposal coming from the market */ export const acceptAll = () => () => true; /** Proposal filter blocking every offer coming from a provider whose id is in the array */ -export const disallowProvidersById = (providerIds: string[]) => (proposal: Proposal) => +export const disallowProvidersById = (providerIds: string[]) => (proposal: OfferProposal) => !providerIds.includes(proposal.provider.id); /** Proposal filter blocking every offer coming from a provider whose name is in the array */ -export const disallowProvidersByName = (providerNames: string[]) => (proposal: Proposal) => +export const disallowProvidersByName = (providerNames: string[]) => (proposal: OfferProposal) => !providerNames.includes(proposal.provider.name); /** Proposal filter blocking every offer coming from a provider whose name match to the regexp */ -export const disallowProvidersByNameRegex = (regexp: RegExp) => (proposal: Proposal) => +export const disallowProvidersByNameRegex = (regexp: RegExp) => (proposal: OfferProposal) => !proposal.provider.name.match(regexp); /** Proposal filter that only allows offers from a provider whose id is in the array */ -export const allowProvidersById = (providerIds: string[]) => (proposal: Proposal) => +export const allowProvidersById = (providerIds: string[]) => (proposal: OfferProposal) => providerIds.includes(proposal.provider.id); /** Proposal filter that only allows offers from a provider whose name is in the array */ -export const allowProvidersByName = (providerNames: string[]) => (proposal: Proposal) => +export const allowProvidersByName = (providerNames: string[]) => (proposal: OfferProposal) => providerNames.includes(proposal.provider.name); /** Proposal filter that only allows offers from a provider whose name match to the regexp */ -export const allowProvidersByNameRegex = (regexp: RegExp) => (proposal: Proposal) => +export const allowProvidersByNameRegex = (regexp: RegExp) => (proposal: OfferProposal) => !!proposal.provider.name.match(regexp); export type PriceLimits = { @@ -40,7 +40,7 @@ export type PriceLimits = { * @param priceLimits.cpuPerSec The maximum price for CPU usage in GLM/s * @param priceLimits.envPerSec The maximum price for the duration of the activity in GLM/s */ -export const limitPriceFilter = (priceLimits: PriceLimits) => (proposal: Proposal) => { +export const limitPriceFilter = (priceLimits: PriceLimits) => (proposal: OfferProposal) => { return ( proposal.pricing.cpuSec <= priceLimits.cpuPerSec && proposal.pricing.envSec <= priceLimits.envPerSec && diff --git a/src/shared/yagna/adapters/agreement-api-adapter.ts b/src/shared/yagna/adapters/agreement-api-adapter.ts index 7178f89f5..f4ef0919c 100644 --- a/src/shared/yagna/adapters/agreement-api-adapter.ts +++ b/src/shared/yagna/adapters/agreement-api-adapter.ts @@ -1,6 +1,6 @@ import { Agreement, IAgreementApi, IAgreementRepository } from "../../../agreement/agreement"; import { MarketApi } from "ya-ts-client"; -import { GolemMarketError, MarketErrorCode, ProposalNew } from "../../../market"; +import { GolemMarketError, MarketErrorCode, OfferProposal } from "../../../market"; import { withTimeout } from "../../utils/timeout"; import { Logger } from "../../utils"; import { AgreementApiConfig } from "../../../agreement"; @@ -38,7 +38,7 @@ export class AgreementApiAdapter implements IAgreementApi { } } - async createAgreement(proposal: ProposalNew): Promise { + async createAgreement(proposal: OfferProposal): Promise { try { const agreementProposalRequest = { proposalId: proposal.id, @@ -81,7 +81,7 @@ export class AgreementApiAdapter implements IAgreementApi { } } - async proposeAgreement(proposal: ProposalNew): Promise { + async proposeAgreement(proposal: OfferProposal): Promise { const agreement = await this.createAgreement(proposal); const confirmed = await this.confirmAgreement(agreement); const state = confirmed.getState(); diff --git a/src/shared/yagna/adapters/market-api-adapter.test.ts b/src/shared/yagna/adapters/market-api-adapter.test.ts index 01f625748..67b43145b 100644 --- a/src/shared/yagna/adapters/market-api-adapter.test.ts +++ b/src/shared/yagna/adapters/market-api-adapter.test.ts @@ -2,7 +2,7 @@ import { instance, when, verify, deepEqual, mock, reset, _, imock } from "@johan import * as YaTsClient from "ya-ts-client"; import { YagnaApi } from "../yagnaApi"; import { DemandRequestBody, MarketApiAdapter } from "./market-api-adapter"; -import { Demand, DemandSpecification, ProposalNew } from "../../../market"; +import { Demand, DemandSpecification, OfferProposal } from "../../../market"; import { take, takeUntil, timer } from "rxjs"; import { Logger } from "../../utils"; import { DemandBodyPrototype } from "../../../market/demand/demand-body-builder"; @@ -23,6 +23,14 @@ describe("Market Api Adapter", () => { const samplePrototype: DemandBodyPrototype = { constraints: ["constraints"], properties: [ + { + key: "golem.com.usage.vector", + value: ["golem.usage.duration_sec", "golem.usage.cpu_sec"], + }, + { + key: "golem.com.pricing.model.linear.coeffs", + value: [0.1, 0.1], + }, { key: "property-key-1", value: "property-value-1", @@ -37,6 +45,8 @@ describe("Market Api Adapter", () => { const expectedBody: DemandRequestBody = { constraints: "constraints", properties: { + "golem.com.usage.vector": ["golem.usage.duration_sec", "golem.usage.cpu_sec"], + "golem.com.pricing.model.linear.coeffs": [0.1, 0.1], "property-key-1": "property-value-1", "property-key-2": "property-value-2", }, @@ -103,7 +113,7 @@ describe("Market Api Adapter", () => { it("should negotiate a proposal with the selected payment platform", async () => { const specification = new DemandSpecification(samplePrototype, "my-selected-payment-platform", 60 * 60 * 1000); - const receivedProposal = new ProposalNew( + const receivedProposal = new OfferProposal( { ...expectedBody, proposalId: "proposal-id", @@ -124,29 +134,25 @@ describe("Market Api Adapter", () => { state: "Draft", }); - const counterProposal = await api.counterProposal(receivedProposal, specification); + await api.counterProposal(receivedProposal, specification); verify( mockMarket.counterProposalDemand( "demand-id", "proposal-id", deepEqual({ - constraints: "constraints", + constraints: expectedBody.constraints, properties: { - "property-key-1": "property-value-1", - "property-key-2": "property-value-2", + ...expectedBody.properties, "golem.com.payment.chosen-platform": "my-selected-payment-platform", }, }), ), ).once(); - expect(counterProposal).toBeInstanceOf(ProposalNew); - expect(counterProposal.id).toBe("counter-id"); - expect(counterProposal.demand).toBe(receivedProposal.demand); }); it("should throw an error if the counter proposal fails", async () => { const specification = new DemandSpecification(samplePrototype, "my-selected-payment-platform", 60 * 60 * 1000); - const receivedProposal = new ProposalNew( + const receivedProposal = new OfferProposal( { ...expectedBody, proposalId: "proposal-id", diff --git a/src/shared/yagna/adapters/market-api-adapter.ts b/src/shared/yagna/adapters/market-api-adapter.ts index d954d23a6..61e2e7efe 100644 --- a/src/shared/yagna/adapters/market-api-adapter.ts +++ b/src/shared/yagna/adapters/market-api-adapter.ts @@ -1,5 +1,13 @@ import { Observable } from "rxjs"; -import { Demand, DemandSpecification, MarketApi, ProposalEvent, ProposalNew } from "../../../market"; +import { + Demand, + DemandSpecification, + MarketApi, + ProposalEvent, + OfferProposal, + GolemMarketError, + MarketErrorCode, +} from "../../../market"; import { YagnaApi } from "../yagnaApi"; import YaTsClient from "ya-ts-client"; import { GolemInternalError } from "../../error/golem-error"; @@ -8,6 +16,17 @@ import { DemandBodyPrototype, DemandPropertyValue } from "../../../market/demand /** * A bit more user-friendly type definition of DemandOfferBaseDTO from ya-ts-client + * + * That's probably one of the most confusing elements around Golem Protocol and the API specificiation: + * + * - Providers create Offers + * - Requestors create Demands + * - Demands are used to create a subscription for Proposals - Initial ones reflect the Offer that was matched with the Demand used to subscribe + * - Once the proposal is countered, it's countered with a "counter proposal" which is no longer Offer + Demand, + * but rather a sketch of the agreement - here both parties try to agree on the values of certain properties that + * are interesting from their perspective. These "negotiated proposals (of) ...." are buit using DemandOffeBaseDTO + * + * #FIXME yagna - feedback in the note above */ export type DemandRequestBody = { properties: Record; @@ -79,20 +98,38 @@ export class MarketApiAdapter implements MarketApi { }); } - async counterProposal(receivedProposal: ProposalNew, demand: DemandSpecification): Promise { + async counterProposal(receivedProposal: OfferProposal, demand: DemandSpecification): Promise { const bodyClone = structuredClone(this.buildDemandRequestBody(demand.prototype)); bodyClone.properties["golem.com.payment.chosen-platform"] = demand.paymentPlatform; + const maybeNewId = await this.yagnaApi.market.counterProposalDemand( receivedProposal.demand.id, receivedProposal.id, bodyClone, ); + + this.logger.debug("Proposal counter result from yagna", { result: maybeNewId }); + if (typeof maybeNewId !== "string") { throw new GolemInternalError(`Counter proposal failed ${maybeNewId.message}`); } - const counterProposalDto = await this.yagnaApi.market.getProposalOffer(receivedProposal.demand.id, maybeNewId); - return new ProposalNew(counterProposalDto, receivedProposal.demand); + } + + async rejectProposal(receivedProposal: OfferProposal, reason: string): Promise { + try { + const result = await this.yagnaApi.market.rejectProposalOffer(receivedProposal.demand.id, receivedProposal.id, { + message: reason, + }); + + this.logger.debug("Proposal rejection result from yagna", { response: result }); + } catch (error) { + throw new GolemMarketError( + `Failed to reject proposal. ${error?.response?.data?.message || error}`, + MarketErrorCode.ProposalRejectionFailed, + error, + ); + } } private buildDemandRequestBody(decorations: DemandBodyPrototype): DemandRequestBody { diff --git a/src/shared/yagna/repository/debit-note-repository.ts b/src/shared/yagna/repository/debit-note-repository.ts index 73aabe490..d7f4f1cbd 100644 --- a/src/shared/yagna/repository/debit-note-repository.ts +++ b/src/shared/yagna/repository/debit-note-repository.ts @@ -1,6 +1,6 @@ import { DebitNote, IDebitNoteRepository } from "../../../payment/debit_note"; import { MarketApi, PaymentApi } from "ya-ts-client"; -import { ProposalProperties } from "../../../market/proposal"; +import { ProposalProperties } from "../../../market/offer-proposal"; export class DebitNoteRepository implements IDebitNoteRepository { constructor( diff --git a/src/shared/yagna/repository/invoice-repository.ts b/src/shared/yagna/repository/invoice-repository.ts index e497ecc0f..8f616f8d3 100644 --- a/src/shared/yagna/repository/invoice-repository.ts +++ b/src/shared/yagna/repository/invoice-repository.ts @@ -1,6 +1,6 @@ import { IInvoiceRepository, Invoice } from "../../../payment/invoice"; import { MarketApi, PaymentApi } from "ya-ts-client"; -import { ProposalProperties } from "../../../market/proposal"; +import { ProposalProperties } from "../../../market/offer-proposal"; export class InvoiceRepository implements IInvoiceRepository { constructor( diff --git a/src/shared/yagna/repository/proposal-repository.ts b/src/shared/yagna/repository/proposal-repository.ts index 41df97a5f..e8089a584 100644 --- a/src/shared/yagna/repository/proposal-repository.ts +++ b/src/shared/yagna/repository/proposal-repository.ts @@ -1,4 +1,4 @@ -import { IProposalRepository, ProposalNew } from "../../../market/proposal"; +import { IProposalRepository, OfferProposal } from "../../../market/offer-proposal"; import { MarketApi } from "ya-ts-client"; import { Demand } from "../../../market"; import { CacheService } from "../../cache/CacheService"; @@ -6,10 +6,10 @@ import { CacheService } from "../../cache/CacheService"; export class ProposalRepository implements IProposalRepository { constructor( private readonly api: MarketApi.RequestorService, - private readonly cache: CacheService, + private readonly cache: CacheService, ) {} - add(proposal: ProposalNew) { + add(proposal: OfferProposal) { this.cache.set(proposal.id, proposal); return proposal; } @@ -18,8 +18,8 @@ export class ProposalRepository implements IProposalRepository { return this.cache.get(id); } - async getByDemandAndId(demand: Demand, id: string): Promise { + async getByDemandAndId(demand: Demand, id: string): Promise { const dto = await this.api.getProposalOffer(demand.id, id); - return new ProposalNew(dto, demand); + return new OfferProposal(dto, demand); } } diff --git a/tests/unit/agreement_pool_service.test.ts b/tests/unit/agreement_pool_service.test.ts index 39bfc92e8..23087cba3 100644 --- a/tests/unit/agreement_pool_service.test.ts +++ b/tests/unit/agreement_pool_service.test.ts @@ -1,5 +1,5 @@ import { anything, imock, instance, mock, reset, when } from "@johanblumenberg/ts-mockito"; -import { Agreement, AgreementPoolService, Demand, Proposal, ProposalNew, YagnaApi } from "../../src"; +import { Agreement, AgreementPoolService, Demand, OfferProposal, YagnaApi } from "../../src"; import { MarketApi } from "ya-ts-client"; import { LoggerMock } from "../mock/utils/logger"; import { IAgreementApi } from "../../src/agreement/agreement"; @@ -42,7 +42,7 @@ const createProposal = (id: string) => { }, }; - return new ProposalNew(model, testDemand); + return new OfferProposal(model, testDemand); }; const sample: MarketApi.AgreementDTO = {