Skip to content

Commit

Permalink
Merge pull request #935 from golemfactory/feature/JST-92/market-agree…
Browse files Browse the repository at this point in the history
…ment-events

feat(market): implemented agreement and offer events
  • Loading branch information
mgordel authored Jun 6, 2024
2 parents 8fa80cf + 53e7ef5 commit b12d85d
Show file tree
Hide file tree
Showing 57 changed files with 1,628 additions and 1,011 deletions.
5 changes: 2 additions & 3 deletions examples/advanced/hello-world.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,10 @@ import { pinoPrettyLogger } from "@golem-sdk/pino-logger";
expirationSec: 60 * 60, // 60 minutes
});
const demandSpecification = await glm.market.buildDemandDetails(order.demand, allocation);
const proposal$ = glm.market.startCollectingProposals({
const draftProposal$ = glm.market.collectDraftOfferProposals({
demandSpecification,
bufferSize: 15,
});
const proposalSubscription = proposalPool.readFrom(proposal$);
const proposalSubscription = proposalPool.readFrom(draftProposal$);
const draftProposal = await proposalPool.acquire();

const agreement = await glm.market.proposeAgreement(draftProposal);
Expand Down
5 changes: 2 additions & 3 deletions examples/advanced/local-image/serveLocalGvmi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,10 @@ const getImagePath = (path: string) => fileURLToPath(new URL(path, import.meta.u
expirationSec: 30 * 60, // 30 minutes
});
const demandSpecification = await glm.market.buildDemandDetails(demand.demand, allocation);
const proposal$ = glm.market.startCollectingProposals({
const draftProposal$ = glm.market.collectDraftOfferProposals({
demandSpecification,
bufferSize: 15,
});
const proposalSubscription = proposalPool.readFrom(proposal$);
const proposalSubscription = proposalPool.readFrom(draftProposal$);
const draftProposal = await proposalPool.acquire();

const agreement = await glm.market.proposeAgreement(draftProposal);
Expand Down
4 changes: 2 additions & 2 deletions examples/advanced/manual-pools.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ const demandOptions = {
const proposalPool = new DraftOfferProposalPool({ minCount: 1 });
const demandSpecification = await glm.market.buildDemandDetails(demandOptions.demand, allocation);

const proposals$ = glm.market.startCollectingProposals({
const draftProposal$ = glm.market.collectDraftOfferProposals({
demandSpecification,
});

const proposalSubscription = proposalPool.readFrom(proposals$);
const proposalSubscription = proposalPool.readFrom(draftProposal$);

/** How many providers you plan to engage simultaneously */
const CONCURRENCY = 2;
Expand Down
7 changes: 3 additions & 4 deletions examples/advanced/proposal-filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ import { pinoPrettyLogger } from "@golem-sdk/pino-logger";
* Example demonstrating how to write a custom proposal filter.
* In this case the proposal must include VPN access and must not be from "bad-provider"
*/
const myFilter: ProposalFilter = (proposal) => {
return (
proposal.provider.name !== "bad-provider" && proposal.properties["golem.runtime.capabilities"]?.includes("vpn")
const myFilter: ProposalFilter = (proposal) =>
Boolean(
proposal.provider.name !== "bad-provider" && proposal.properties["golem.runtime.capabilities"]?.includes("vpn"),
);
};

const order: MarketOrderSpec = {
demand: {
Expand Down
55 changes: 55 additions & 0 deletions examples/advanced/proposal-selector.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import { MarketOrderSpec, GolemNetwork, OfferProposal } from "@golem-sdk/golem-js";
import { pinoPrettyLogger } from "@golem-sdk/pino-logger";

/**
* Example demonstrating how to write a selector which choose the best provider based on scores provided as object: [providerName]: score
* A higher score rewards the provider.
*/
const scores = {
"provider-1": 100,
"golem-provider": 50,
"super-provider": 25,
};

const bestProviderSelector = (scores: { [providerName: string]: number }) => (proposals: OfferProposal[]) => {
proposals.sort((a, b) => ((scores?.[a.provider.name] || 0) >= (scores?.[b.provider.name] || 0) ? -1 : 1));
return proposals[0];
};

const order: MarketOrderSpec = {
demand: {
workload: { imageTag: "golem/alpine:latest" },
},
market: {
rentHours: 0.5,
pricing: {
model: "linear",
maxStartPrice: 0.5,
maxCpuPerHourPrice: 1.0,
maxEnvPerHourPrice: 0.5,
},
proposalSelector: bestProviderSelector(scores),
},
};

(async () => {
const glm = new GolemNetwork({
logger: pinoPrettyLogger({
level: "info",
}),
});

try {
await glm.connect();
const lease = await glm.oneOf(order);
await lease
.getExeUnit()
.then((exe) => exe.run(`echo [provider:${exe.provider.name}] Hello, Golem! 👋`))
.then((res) => console.log(res.stdout));
await lease.finalize();
} catch (err) {
console.error("Failed to run the example", err);
} finally {
await glm.disconnect();
}
})().catch(console.error);
64 changes: 64 additions & 0 deletions examples/basic/events.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/**
* This example showcases how users can listen to various events exposed from golem-js
*/
import { GolemNetwork } from "@golem-sdk/golem-js";
import { pinoPrettyLogger } from "@golem-sdk/pino-logger";

(async () => {
const glm = new GolemNetwork({
logger: pinoPrettyLogger({
level: "info",
}),
payment: {
driver: "erc20",
network: "holesky",
},
});

try {
await glm.connect();

glm.market.events.on("agreementApproved", (event) => {
console.log("Agreement '%s' approved at %s", event.agreement.id, event.timestamp);
});

glm.market.events.on("agreementTerminated", (event) => {
console.log(
"Agreement '%s' terminated by '%s' with reason '%s'",
event.agreement.id,
event.terminatedBy,
event.reason,
);
});

glm.market.events.on("offerCounterProposalRejected", (event) => {
console.warn("Proposal rejected by provider", event);
});

const lease = await glm.oneOf({
demand: {
workload: { imageTag: "golem/alpine:latest" },
},
market: {
rentHours: 0.5,
pricing: {
model: "linear",
maxStartPrice: 0.5,
maxCpuPerHourPrice: 1.0,
maxEnvPerHourPrice: 0.5,
},
},
});

await lease
.getExeUnit()
.then((exe) => exe.run("echo Hello, Golem! 👋"))
.then((res) => console.log(res.stdout));

await lease.finalize();
} catch (err) {
console.error("Failed to run the example", err);
} finally {
await glm.disconnect();
}
})().catch(console.error);
1 change: 1 addition & 0 deletions examples/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"basic-many-of": "tsx basic/many-of.ts",
"basic-vpn": "tsx basic/vpn.ts",
"basic-transfer": "tsx basic/transfer.ts",
"basic-events": "tsx basic/events.ts",
"basic-run-and-stream": "tsx basic/run-and-stream.ts",
"advanced-hello-world": "tsx advanced/hello-world.ts",
"advanced-manual-pools": "tsx advanced/manual-pools.ts",
Expand Down
30 changes: 16 additions & 14 deletions src/experimental/deployment/deployment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ import { DraftOfferProposalPool, MarketModule } from "../../market";
import { PaymentModule } from "../../payment";
import { CreateLeaseProcessPoolOptions } from "./builder";
import { Subscription } from "rxjs";
import { LeaseProcessPool } from "../../lease-process";
import { LeaseModule } from "../../lease-process/lease.module";
import { LeaseModule, LeaseProcessPool } from "../../lease-process";

export enum DeploymentState {
INITIAL = "INITIAL",
Expand Down Expand Up @@ -152,18 +151,18 @@ export class Deployment {
: undefined;

const demandSpecification = await this.modules.market.buildDemandDetails(pool.options.demand, allocation);
const proposalPool = new DraftOfferProposalPool();

const proposalSubscription = this.modules.market
.startCollectingProposals({
demandSpecification,
filter: pool.options.market.proposalFilter,
bufferSize: 10,
})
.subscribe({
next: (proposals) => proposals.forEach((proposal) => proposalPool.add(proposal)),
error: (e) => this.logger.error("Error while collecting proposals", e),
});
const proposalPool = new DraftOfferProposalPool({
logger: this.logger,
validateProposal: pool.options.market.proposalFilter,
selectProposal: pool.options.market.proposalSelector,
});

const draftProposal$ = this.modules.market.collectDraftOfferProposals({
demandSpecification,
filter: pool.options.market.proposalFilter,
});

const proposalSubscription = proposalPool.readFrom(draftProposal$);

const leaseProcessPool = this.modules.lease.createLeaseProcessPool(proposalPool, allocation, {
replicas: pool.options.deployment?.replicas,
Expand All @@ -172,6 +171,9 @@ export class Deployment {
activity: pool.options?.activity,
payment: pool.options?.payment,
},
agreementOptions: {
expirationSec: pool.options.market.rentHours * 3600,
},
});
this.pools.set(pool.name, {
proposalPool,
Expand Down
2 changes: 1 addition & 1 deletion src/experimental/reputation/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export {
DEFAULT_AGREEMENT_TOP_POOL_SIZE,
} from "./system";
export { ReputationWeights } from "./types";
export { AgreementSelectorOptions } from "./types";
export { ProposalSelectorOptions } from "./types";
export { ProposalFilterOptions } from "./types";
export { ReputationData } from "./types";
export { ReputationProviderEntry } from "./types";
Expand Down
48 changes: 24 additions & 24 deletions src/experimental/reputation/system.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { ProposalFilter, OfferProposal } from "../../market";
import { AgreementCandidate, AgreementSelector } from "../../market/agreement";
import { ProposalFilter, OfferProposal, ProposalSelector } from "../../market";
import { GolemReputationError } from "./error";
import {
AgreementSelectorOptions,
ProposalSelectorOptions,
ProposalFilterOptions,
ReputationConfig,
ReputationData,
Expand Down Expand Up @@ -68,7 +67,7 @@ export const REPUTATION_PRESETS: ReputationPresets = {
cpuSingleThreadScore: 1,
},
},
agreementSelector: {
proposalSelector: {
weights: {
cpuSingleThreadScore: 1,
},
Expand All @@ -86,7 +85,7 @@ export const REPUTATION_PRESETS: ReputationPresets = {
cpuMultiThreadScore: 0.2,
},
},
agreementSelector: {
proposalSelector: {
weights: {
uptime: 0.5,
cpuMultiThreadScore: 0.5,
Expand Down Expand Up @@ -174,7 +173,7 @@ export class ReputationSystem {
* Default options used when creating agreement selector.
* @private
*/
private defaultAgreementSelectorOptions: AgreementSelectorOptions;
private defaultAgreementSelectorOptions: ProposalSelectorOptions;

/**
* Create a new reputation system client and fetch the reputation data.
Expand All @@ -196,7 +195,6 @@ export class ReputationSystem {
};
this.defaultAgreementSelectorOptions = {
topPoolSize: DEFAULT_AGREEMENT_TOP_POOL_SIZE,
agreementBonus: 0,
};

if (this.config?.preset) {
Expand All @@ -218,8 +216,8 @@ export class ReputationSystem {
this.setProposalWeights(presetConfig.proposalFilter.weights);
}

if (presetConfig.agreementSelector?.weights) {
this.setAgreementWeights(presetConfig.agreementSelector.weights);
if (presetConfig.proposalSelector?.weights) {
this.setAgreementWeights(presetConfig.proposalSelector.weights);
}

this.defaultProposalFilterOptions = {
Expand All @@ -228,9 +226,10 @@ export class ReputationSystem {
};

this.defaultAgreementSelectorOptions = {
topPoolSize: presetConfig.agreementSelector?.topPoolSize ?? this.defaultAgreementSelectorOptions.topPoolSize,
agreementBonus:
presetConfig.agreementSelector?.agreementBonus ?? this.defaultAgreementSelectorOptions.agreementBonus,
topPoolSize: presetConfig.proposalSelector?.topPoolSize ?? this.defaultAgreementSelectorOptions.topPoolSize,
// TODO: to be discussed with the reputation team
// agreementBonus:
// presetConfig.proposalSelector?.agreementBonus ?? this.defaultAgreementSelectorOptions.agreementBonus,
};
}

Expand Down Expand Up @@ -396,22 +395,23 @@ export class ReputationSystem {
*
* @param opts
*/
agreementSelector(opts?: AgreementSelectorOptions): AgreementSelector {
agreementSelector(opts?: ProposalSelectorOptions): ProposalSelector {
const poolSize =
opts?.topPoolSize ?? this.defaultAgreementSelectorOptions.topPoolSize ?? DEFAULT_AGREEMENT_TOP_POOL_SIZE;

return async (candidates): Promise<AgreementCandidate> => {
return (proposals): OfferProposal => {
// Cache scores for providers.
const scoresMap = new Map<string, number>();

candidates.forEach((c) => {
const data = this.providersScoreMap.get(c.proposal.provider.id)?.scores ?? {};
let score = this.calculateScore(data, this.agreementWeights);
if (c.agreement) score += opts?.agreementBonus ?? this.defaultAgreementSelectorOptions.agreementBonus ?? 0;
scoresMap.set(c.proposal.provider.id, score);
proposals.forEach((c) => {
const data = this.providersScoreMap.get(c.provider.id)?.scores ?? {};
const score = this.calculateScore(data, this.agreementWeights);
// TODO: to be discussed with the reputation team
// if (c.agreement) score += opts?.agreementBonus ?? this.defaultAgreementSelectorOptions.agreementBonus ?? 0;
scoresMap.set(c.provider.id, score);
});

const array = this.sortCandidatesByScore(candidates, scoresMap);
const array = this.sortCandidatesByScore(proposals, scoresMap);

const topPool = Math.min(poolSize, array.length);
const index = topPool === 1 ? 0 : Math.floor(Math.random() * topPool);
Expand Down Expand Up @@ -456,12 +456,12 @@ export class ReputationSystem {
});
}

sortCandidatesByScore(candidates: AgreementCandidate[], scoresMap: Map<string, number>): AgreementCandidate[] {
const array = Array.from(candidates);
sortCandidatesByScore(proposals: OfferProposal[], scoresMap: Map<string, number>): OfferProposal[] {
const array = Array.from(proposals);

array.sort((a, b) => {
const aId = a.proposal.provider.id;
const bId = b.proposal.provider.id;
const aId = a.provider.id;
const bId = b.provider.id;

// Get the score values.
const aScoreValue = scoresMap.get(aId) ?? 0;
Expand Down
11 changes: 2 additions & 9 deletions src/experimental/reputation/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ export interface ProposalFilterOptions {
* Options for the agreement selector.
* @experimental
*/
export interface AgreementSelectorOptions {
export interface ProposalSelectorOptions {
/**
* The size of top provider pool used to pick a random one.
*
Expand All @@ -105,13 +105,6 @@ export interface AgreementSelectorOptions {
* Default is `DEFAULT_AGREEMENT_TOP_POOL_SIZE`.
*/
topPoolSize?: number;

/**
* Add extra score to provider if it has an existing agreement.
*
* Default is 0.
*/
agreementBonus?: number;
}

/**
Expand All @@ -135,7 +128,7 @@ export interface ReputationWeightsMixin {
*/
export interface ReputationPreset {
proposalFilter?: ProposalFilterOptions & ReputationWeightsMixin;
agreementSelector?: AgreementSelectorOptions & ReputationWeightsMixin;
proposalSelector?: ProposalSelectorOptions & ReputationWeightsMixin;
}

/**
Expand Down
Loading

0 comments on commit b12d85d

Please sign in to comment.