diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index fd9079fce..2fa4664dc 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -39,3 +39,5 @@ jobs: npm run lint npm run test:unit npm run build + npm install --prefix examples + npm run --prefix examples lint:ts diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 86a5b8e66..2bdda5847 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -42,7 +42,8 @@ jobs: npm run format:check npm run lint npm run test:unit - npm run build + npm install --prefix examples + npm run --prefix examples lint:ts run-integration-and-e2e-tests: name: Run integration and E2E tests diff --git a/README.md b/README.md index d9f8c1b4a..b211150ff 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,28 @@ # Golem JavaScript API +## Table of contents + + + +- [Golem JavaScript API](#golem-javascript-api) + - [Table of contents](#table-of-contents) + - [What's Golem and `golem-js`?](#whats-golem-and-golem-js) + - [Golem application development](#golem-application-development) + - [Installation](#installation) + - [Building](#building) + - [Usage](#usage) + - [Node.js context](#nodejs-context) + - [Web Browser context](#web-browser-context) + - [Testing](#testing) + - [Running unit tests](#running-unit-tests) + - [Running E2E tests](#running-e2e-tests) + - [NodeJS](#nodejs) + - [Cypress](#cypress) + - [Contributing](#contributing) + - [Controlling interactions and costs](#controlling-interactions-and-costs) + - [See also](#see-also) + + ![GitHub](https://img.shields.io/github/license/golemfactory/golem-js) ![npm](https://img.shields.io/npm/v/@golem-sdk/golem-js) ![node-current](https://img.shields.io/node/v/@golem-sdk/golem-js) @@ -10,13 +33,18 @@ ## What's Golem and `golem-js`? -**[The Golem Network](https://golem.network)** fosters a global group of creators building ambitious software solutions that will shape the technological landscape of future generations by accessing computing resources across the platform. Golem Network is an accessible, reliable, open access and censorship-resistant protocol, democratizing access to digital resources and connecting users through a flexible, open-source platform. +**[The Golem Network](https://golem.network)** fosters a global group of creators building ambitious software solutions +that will shape the technological landscape of future generations by accessing computing resources across the platform. +Golem Network is an accessible, reliable, open access and censorship-resistant protocol, democratizing access to digital +resources and connecting users through a flexible, open-source platform. -**@golem-sdk/golem-js** is the JavaScript API that allows developers to connect to their Golem nodes and manage their distributed, computational loads through Golem Network. +**@golem-sdk/golem-js** is the JavaScript API that allows developers to connect to their Golem nodes and manage their +distributed, computational loads through Golem Network. ## Golem application development -For a detailed introduction to using Golem and `@golem-sdk/golem-js` to run your tasks on Golem [please consult our quickstart section](https://docs.golem.network/creators/javascript/quickstart/). +For a detailed introduction to using Golem and `@golem-sdk/golem-js` to run your tasks on +Golem [please consult our quickstart section](https://docs.golem.network/creators/javascript/quickstart/). ### Installation @@ -68,7 +96,8 @@ import { TaskExecutor } from "@golem-sdk/golem-js"; ![hello_web](https://user-images.githubusercontent.com/26308335/217530424-a1dd4487-f95f-43e6-a91b-7106b6f30802.gif) -For more detailed usage examples and tutorials, see the [Java Script API section of the Golem Network Docs](https://docs.golem.network/creators/javascript/) +For more detailed usage examples and tutorials, see +the [Java Script API section of the Golem Network Docs](https://docs.golem.network/creators/javascript/) ### Testing @@ -84,9 +113,13 @@ yarn test:unit ### Running E2E tests -Both test cases for the NodeJS environment and the browser (cypress) require preparation of a test environment of the Golem Network with providers and all the necessary infrastructure. [Goth](https://github.com/golemfactory/goth) framework is used for this purpose. +Both test cases for the NodeJS environment and the browser (cypress) require preparation of a test environment of the +Golem Network with Providers and all the necessary infrastructure. [Goth](https://github.com/golemfactory/goth) +framework is used for this purpose. -To enable E2E testing, you need to ensure that `python -m goth` is executable. Therefore, you must first install [Goth](https://github.com/golemfactory/goth) according to the instructions described in the readme of the project. +To enable E2E testing, you need to ensure that `python -m goth` is executable. Therefore, you must first +install [Goth](https://github.com/golemfactory/goth) according to the instructions described in the readme of the +project. #### NodeJS @@ -114,12 +147,51 @@ yarn lint yarn format ``` +## Controlling interactions and costs + +The Golem Network provides an open marketplace where anyone can join as a Provider and supply the network with their +computing power. In return for their service, they are billing Requestors (users of this SDK) according to the pricing +that they define. As a Requestor, you might want to: + +- control the limit price so that you're not going to over-spend your funds +- control the interactions with the providers if you have a list of the ones which you like or the ones which you would + like to avoid + +To make this easy, we provided you with a set of predefined market proposal filters, which you can combine to implement +your own market strategy. For example: + +```typescript +import { TaskExecutor, ProposalFilters } from "@golem-sdk/golem-js"; + +const executor = await TaskExecutor.create({ + // What do you want to run + package: "golem/alpine:3.18.2", + + // How much you wish to spend + budget: 0.5, + proposalFilter: ProposalFilters.limitPriceFilter({ + start: 1, + cpuPerSec: 1 / 3600, + envPerSec: 1 / 3600, + }), + + // Where you want to spend + payment: { + network: "polygon", + }, +}); +``` + +To learn more about other filters, please check the [API reference of the market/strategy module](https://docs.golem.network/docs/golem-js/reference/modules/market_strategy) + ## See also - [Golem](https://golem.network), a global, open-source, decentralized supercomputer that anyone can access. - Learn what you need to know to set up your Golem requestor node: - [Requestor development: a quick primer](https://handbook.golem.network/requestor-tutorials/flash-tutorial-of-requestor-development) - [Quick start](https://docs.golem.network/creators/javascript/quickstart/) -- Have a look at the most important concepts behind any Golem application: [Golem application fundamentals](https://handbook.golem.network/requestor-tutorials/golem-application-fundamentals) -- Learn about preparing your own Docker-like images for the [VM runtime](https://handbook.golem.network/requestor-tutorials/vm-runtime) +- Have a look at the most important concepts behind any Golem + application: [Golem application fundamentals](https://handbook.golem.network/requestor-tutorials/golem-application-fundamentals) +- Learn about preparing your own Docker-like images for + the [VM runtime](https://handbook.golem.network/requestor-tutorials/vm-runtime) - Write your own app with [JavaScript API](https://docs.golem.network/creators/javascript/high-level/task-model/) diff --git a/examples/package.json b/examples/package.json index 5e9c108a0..b8610f646 100644 --- a/examples/package.json +++ b/examples/package.json @@ -18,7 +18,8 @@ "fibonacci": "node ./fibonacci/fibonacci.js", "ssh": "ts-node ./ssh/ssh.ts", "tag": "ts-node ./simple-usage/tag.ts", - "web": "node ./web/app.mjs" + "web": "node ./web/app.mjs", + "lint:ts": "tsc --project tsconfig.json --noEmit" }, "author": "GolemFactory ", "license": "LGPL-3.0", diff --git a/examples/simple-usage/map.ts b/examples/simple-usage/map.ts index a722c12de..929b48202 100644 --- a/examples/simple-usage/map.ts +++ b/examples/simple-usage/map.ts @@ -4,9 +4,9 @@ import { TaskExecutor } from "@golem-sdk/golem-js"; const executor = await TaskExecutor.create("9a3b5d67b0b27746283cb5f287c13eab1beaa12d92a9f536b747c7ae"); const data = ["one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten"]; - const results = executor.map(data, async (ctx, x) => { + const results = executor.map(data, async (ctx, x) => { const res = await ctx.run(`echo "${x}"`); - return res.stdout?.trim(); + return res.stdout?.toString().trim(); }); const finalOutput: string[] = []; for await (const res of results) { diff --git a/examples/strategy/customProviderFilter.ts b/examples/strategy/customProviderFilter.ts index 610dd4dbf..1a6ac7303 100644 --- a/examples/strategy/customProviderFilter.ts +++ b/examples/strategy/customProviderFilter.ts @@ -1,10 +1,10 @@ -import { TaskExecutor, ProposalDTO } from "@golem-sdk/golem-js"; +import { ProposalFilter, TaskExecutor } from "@golem-sdk/golem-js"; /** * Example demonstrating how to write a custom proposal filter. * In this case the proposal must include VPN access and must not be from "bad-provider" */ -const myFilter = async (proposal: ProposalDTO) => { +const myFilter: ProposalFilter = async (proposal) => { return ( proposal.provider.name !== "bad-provider" || !proposal.properties["golem.runtime.capabilities"]?.includes("vpn") ); diff --git a/examples/yacat/yacat.ts b/examples/yacat/yacat.ts index 612617bc1..3aae6c8f7 100644 --- a/examples/yacat/yacat.ts +++ b/examples/yacat/yacat.ts @@ -15,7 +15,7 @@ async function main(args) { }); const keyspace = await executor.run(async (ctx) => { const result = await ctx.run(`hashcat --keyspace -a 3 ${args.mask} -m 400`); - return parseInt(result.stdout || ""); + return parseInt(result.stdout?.toString().trim() || ""); }); if (!keyspace) throw new Error(`Cannot calculate keyspace`); @@ -34,7 +34,7 @@ async function main(args) { .run("cat pass.potfile || true") .end(); if (!results?.[1]?.stdout) return false; - return results?.[1]?.stdout.split(":")[1]; + return results?.[1]?.stdout.toString().trim().split(":")[1]; }); let password = ""; diff --git a/src/activity/activity.ts b/src/activity/activity.ts index d03cbaf48..708ae4b0d 100644 --- a/src/activity/activity.ts +++ b/src/activity/activity.ts @@ -1,4 +1,4 @@ -import { Result, StreamingBatchEvent } from "./results"; +import { Result, ResultState, StreamingBatchEvent } from "./results"; import EventSource from "eventsource"; import { Readable } from "stream"; import { Logger } from "../utils"; @@ -188,7 +188,7 @@ export class Activity { // This will ignore "incompatibility" between ExeScriptCommandResultResultEnum and ResultState, which both // contain exactly the same entries, however TSC refuses to compile it as it assumes the former is dynamicaly // computed. - const { data: results }: { data: Result[] } = (await api.control.getExecBatchResults( + const { data: rawExecBachResults } = await api.control.getExecBatchResults( activityId, batchId, undefined, @@ -196,9 +196,9 @@ export class Activity { { timeout: 0, }, - )) as unknown as { data: Result[] }; + ); retryCount = 0; - const newResults = results.slice(lastIndex + 1); + const newResults = rawExecBachResults.map((rawResult) => new Result(rawResult)).slice(lastIndex + 1); if (Array.isArray(newResults) && newResults.length) { newResults.forEach((result) => { this.push(result); @@ -333,15 +333,19 @@ export class Activity { private parseEventToResult(msg: string, batchSize: number): Result { try { const event: StreamingBatchEvent = JSON.parse(msg); - return { + return new Result({ index: event.index, eventDate: event.timestamp, - result: event?.kind?.finished ? (event?.kind?.finished?.return_code === 0 ? "Ok" : "Error") : undefined, + result: event?.kind?.finished + ? event?.kind?.finished?.return_code === 0 + ? ResultState.Ok + : ResultState.Error + : ResultState.Error, stdout: event?.kind?.stdout, stderr: event?.kind?.stderr, message: event?.kind?.finished?.message, isBatchFinished: event.index + 1 >= batchSize && Boolean(event?.kind?.finished), - } as Result; + }); } catch (error) { throw new Error(`Cannot parse ${msg} as StreamingBatchEvent`); } diff --git a/src/activity/results.test.ts b/src/activity/results.test.ts new file mode 100644 index 000000000..38261d18c --- /dev/null +++ b/src/activity/results.test.ts @@ -0,0 +1,39 @@ +import { Result, ResultState } from "./results"; + +describe("Results", () => { + describe("converting output to JSON", () => { + describe("positive cases", () => { + test("produces JSON when the stdout contains correct data", () => { + const result = new Result({ + index: 0, + result: ResultState.Ok, + stdout: '{ "value": 55 }\n', + stderr: null, + message: null, + isBatchFinished: true, + eventDate: "2023-08-29T09:23:52.305095307Z", + }); + + expect(result.getOutputAsJson()).toEqual({ + value: 55, + }); + }); + }); + + describe("negative cases", () => { + test("throws an error when stdout does not contain nice JSON", () => { + const result = new Result({ + index: 0, + result: ResultState.Ok, + stdout: "not json\n", + stderr: null, + message: null, + isBatchFinished: true, + eventDate: "2023-08-29T09:23:52.305095307Z", + }); + + expect(() => result.getOutputAsJson()).toThrow("Failed to parse output to JSON!"); + }); + }); + }); +}); diff --git a/src/activity/results.ts b/src/activity/results.ts index 489aa19f1..1ac775ab6 100644 --- a/src/activity/results.ts +++ b/src/activity/results.ts @@ -1,12 +1,13 @@ -export enum ResultState { - OK = "Ok", - ERROR = "Error", -} +import { ExeScriptCommandResultResultEnum } from "ya-ts-client/dist/ya-activity/src/models/exe-script-command-result"; + +export import ResultState = ExeScriptCommandResultResultEnum; /** * @hidden */ -export interface Result { +// FIXME: Make the `data` field Uint8Array and update the rest of the code +// eslint-disable-next-line +export interface ResultData { /** Index of script command */ index: number; /** The datetime of the event on which the result was received */ @@ -14,16 +15,57 @@ export interface Result { /** If is success */ result: ResultState; /** stdout of script command */ - stdout?: string; + stdout?: string | ArrayBuffer | null; /** stderr of script command */ - stderr?: string; + stderr?: string | ArrayBuffer | null; /** an error message if the result is not successful */ - message?: string; + message?: string | null; /** Is batch of already finished */ isBatchFinished?: boolean; + + /** In case the command was related to upload or download, this will contain the transferred data */ data?: T; } +// FIXME: Make the `data` field Uint8Array and update the rest of the code +// eslint-disable-next-line +export class Result implements ResultData { + index: number; + eventDate: string; + result: ResultState; + stdout?: string | ArrayBuffer | null; + stderr?: string | ArrayBuffer | null; + message?: string | null; + isBatchFinished?: boolean; + data?: TData; + + constructor(props: ResultData) { + this.index = props.index; + this.eventDate = props.eventDate; + this.result = props.result; + this.stdout = props.stdout; + this.stderr = props.stderr; + this.message = props.message; + this.isBatchFinished = props.isBatchFinished; + this.data = props.data; + } + + /** + * Helper method making JSON-like output results more accessible + */ + public getOutputAsJson(): Output { + if (!this.stdout) { + throw new Error("Can't convert Result output to JSON, because the output is missing!"); + } + + try { + return JSON.parse(this.stdout.toString().trim()); + } catch (err) { + throw new Error(`Failed to parse output to JSON! Output: "${this.stdout.toString()}". Error: ${err}`); + } + } +} + export interface StreamingBatchEvent { batch_id: string; index: number; diff --git a/src/agreement/service.ts b/src/agreement/service.ts index b1754611e..5bdca4ccc 100644 --- a/src/agreement/service.ts +++ b/src/agreement/service.ts @@ -2,7 +2,7 @@ import Bottleneck from "bottleneck"; import { Logger } from "../utils"; import { Agreement, AgreementOptions, AgreementStateEnum } from "./agreement"; import { AgreementServiceConfig } from "./config"; -import { Proposal, ProposalDTO } from "../market/proposal"; +import { Proposal } from "../market/proposal"; import sleep from "../utils/sleep"; export interface AgreementDTO { @@ -12,7 +12,7 @@ export interface AgreementDTO { export class AgreementCandidate { agreement?: AgreementDTO; - constructor(readonly proposal: ProposalDTO) {} + constructor(readonly proposal: Proposal) {} } export type AgreementSelector = (candidates: AgreementCandidate[]) => Promise; diff --git a/src/executor/config.ts b/src/executor/config.ts index 2785d2fd4..0c71c4e88 100644 --- a/src/executor/config.ts +++ b/src/executor/config.ts @@ -7,7 +7,7 @@ const DEFAULTS = Object.freeze({ payment: { driver: "erc20", network: "goerli" }, budget: 1.0, subnetTag: "public", - logLevel: LogLevel.info, + logLevel: LogLevel.Info, basePath: "http://127.0.0.1:7465", maxParallelTasks: 5, taskTimeout: 1000 * 60 * 5, // 5 min, diff --git a/src/executor/executor.ts b/src/executor/executor.ts index 61d0796f2..ab61a509a 100644 --- a/src/executor/executor.ts +++ b/src/executor/executor.ts @@ -265,7 +265,7 @@ export class TaskExecutor { */ async run(worker: Worker): Promise { return this.executeTask(worker).catch(async (e) => { - await this.handleCriticalError(e); + this.handleCriticalError(e); return undefined; }); } diff --git a/src/index.ts b/src/index.ts index 2546744ed..7665bd8ab 100755 --- a/src/index.ts +++ b/src/index.ts @@ -8,7 +8,7 @@ export { } from "./storage"; export { ActivityStateEnum, Result } from "./activity"; export { AgreementCandidate, AgreementSelectors } from "./agreement"; -export { ProposalFilters, ProposalDTO } from "./market"; +export { ProposalFilters, ProposalFilter } from "./market"; export { Package, PackageOptions } from "./package"; export { PaymentFilters } from "./payment"; export { Events, BaseEvent, EventType } from "./events"; diff --git a/src/market/index.ts b/src/market/index.ts index 1c2ca77e6..806bf9987 100644 --- a/src/market/index.ts +++ b/src/market/index.ts @@ -1,6 +1,6 @@ export { MarketService, ProposalFilter } from "./service"; export { Demand, DemandEventType, DemandOptions, DemandEvent } from "./demand"; -export { Proposal, ProposalDetails, ProposalDTO } from "./proposal"; +export { Proposal, ProposalDetails } from "./proposal"; export { MarketDecoration } from "./builder"; export { DemandConfig } from "./config"; export * as ProposalFilters from "./strategy"; diff --git a/src/market/proposal.test.ts b/src/market/proposal.test.ts new file mode 100644 index 000000000..8066df084 --- /dev/null +++ b/src/market/proposal.test.ts @@ -0,0 +1,120 @@ +import { Proposal as ProposalModel, ProposalAllOfStateEnum } from "ya-ts-client/dist/ya-market/src/models"; +import { Proposal, ProposalProperties } from "./proposal"; +import { RequestorApi } from "ya-ts-client/dist/ya-market/api"; + +jest.mock("ya-ts-client/dist/ya-market/api"); + +const mockDemand = { + properties: {}, + constraints: "", +}; + +const mockApi = new RequestorApi(); + +const mockCounteringProposalReference = jest.fn(); + +const buildTestProposal = (props: Partial): Proposal => { + const model: ProposalModel = { + constraints: "", + issuerId: "", + proposalId: "", + state: ProposalAllOfStateEnum.Initial, + timestamp: "", + properties: props, + }; + + const proposal = new Proposal( + "example-subscriptionId", + null, + mockCounteringProposalReference, + mockApi, + model, + mockDemand, + ); + + return proposal; +}; + +describe("Proposal", () => { + describe("Validation", () => { + test("throws an error when linear pricing vector is missing", () => { + expect(() => + buildTestProposal({ + "golem.com.usage.vector": ["golem.usage.cpu_sec", "golem.usage.duration_sec"], + }), + ).toThrow("Broken proposal: the `golem.com.pricing.model.linear.coeffs` does not contain pricing information"); + }); + + test("throws an error when linear pricing vector is empty", () => { + expect(() => + buildTestProposal({ + "golem.com.usage.vector": ["golem.usage.cpu_sec", "golem.usage.duration_sec"], + "golem.com.pricing.model.linear.coeffs": [], + }), + ).toThrow("Broken proposal: the `golem.com.pricing.model.linear.coeffs` does not contain pricing information"); + }); + + test("linear pricing vector has too few items", () => { + expect(() => + buildTestProposal({ + "golem.com.usage.vector": ["golem.usage.cpu_sec", "golem.usage.duration_sec"], + "golem.com.pricing.model.linear.coeffs": [1], + }), + ).toThrow("Broken proposal: the `golem.com.pricing.model.linear.coeffs` should contain 3 price values"); + }); + + test("usage vector is empty", () => { + expect(() => + buildTestProposal({ + "golem.com.usage.vector": [], + "golem.com.pricing.model.linear.coeffs": [1, 2, 3], + }), + ).toThrow("Broken proposal: the `golem.com.usage.vector` does not contain price information"); + }); + + test("usage vector is missing", () => { + expect(() => + buildTestProposal({ + "golem.com.pricing.model.linear.coeffs": [1, 2, 3], + }), + ).toThrow("Broken proposal: the `golem.com.usage.vector` does not contain price information"); + }); + + test("usage vector is has too few items", () => { + expect(() => + buildTestProposal({ + "golem.com.usage.vector": ["golem.usage.cpu_sec"], + "golem.com.pricing.model.linear.coeffs": [1, 2, 3], + }), + ).toThrow( + "Broken proposal: the `golem.com.usage.vector` has less pricing information than `golem.com.pricing.model.linear.coeffs`", + ); + }); + }); + + describe("Extracting pricing information", () => { + describe("positive cases", () => { + test("it extracts the ENV and CPU prices based on the vector, and uses the last price value for START", () => { + const proposal = buildTestProposal({ + "golem.com.usage.vector": ["golem.usage.duration_sec", "golem.usage.cpu_sec"], + "golem.com.pricing.model.linear.coeffs": [0.01, 0.02, 0.03], + }); + + expect(proposal.pricing.envSec).toEqual(0.01); + expect(proposal.pricing.cpuSec).toEqual(0.02); + expect(proposal.pricing.start).toEqual(0.03); + }); + + test("flipping CPU and ENV in the vector still correctly matches to the prices on the pricing model", () => { + const proposal = buildTestProposal({ + "golem.com.usage.vector": ["golem.usage.cpu_sec", "golem.usage.duration_sec"], + "golem.com.pricing.model.linear.coeffs": [0.02, 0.01, 0.03], + }); + + expect(proposal.pricing.envSec).toEqual(0.01); + expect(proposal.pricing.cpuSec).toEqual(0.02); + expect(proposal.pricing.start).toEqual(0.03); + }); + }); + }); +}); diff --git a/src/market/proposal.ts b/src/market/proposal.ts index fd72abbfc..e52e11681 100644 --- a/src/market/proposal.ts +++ b/src/market/proposal.ts @@ -3,8 +3,53 @@ import { RequestorApi } from "ya-ts-client/dist/ya-market/api"; import { DemandOfferBase } from "ya-ts-client/dist/ya-market"; import { Events } from "../events"; +type PricingInfo = { + cpuSec: number; + envSec: number; + start: number; +}; + +export type ProposalProperties = Record & { + "golem.activity.caps.transfer.protocol": string[]; + "golem.com.payment.debit-notes.accept-timeout?": number; + "golem.com.payment.platform.erc20-polygon-glm.address"?: string; + "golem.com.payment.platform.erc20-goerli-tglm.address"?: string; + "golem.com.payment.platform.erc20-mumbai-tglm.address"?: string; + /** + * @deprecated rinkeby is no longer supported, use other test networks instead + */ + "golem.com.payment.platform.erc20-rinkeby-tglm.address"?: string; + /** + * @deprecated rinkeby is no longer supported, use other test networks instead + */ + "golem.com.payment.platform.zksync-rinkeby-tglm.address"?: string; + "golem.com.pricing.model": "linear"; + "golem.com.pricing.model.linear.coeffs": number[]; + "golem.com.scheme": string; + "golem.com.scheme.payu.debit-note.interval-sec?"?: number; + "golem.com.scheme.payu.payment-timeout-sec?"?: number; + "golem.com.usage.vector": string[]; + "golem.inf.cpu.architecture": string; + "golem.inf.cpu.brand": string; + "golem.inf.cpu.capabilities": string[]; + "golem.inf.cpu.cores": number; + "golem.inf.cpu.model": string; + "golem.inf.cpu.threads": number; + "golem.inf.cpu.vendor": string[]; + "golem.inf.mem.gib": number; + "golem.inf.storage.gib": number; + "golem.node.debug.subnet": string; + "golem.node.id.name": string; + "golem.node.net.is-public": boolean; + "golem.runtime.capabilities": string[]; + "golem.runtime.name": string; + "golem.runtime.version": string; + "golem.srv.caps.multi-activity": boolean; + "golem.srv.caps.payload-manifest": boolean; +}; + export interface ProposalDetails { - transferProtocol: string; + transferProtocol: string[]; cpuBrand: string; cpuCapabilities: string[]; cpuCores: number; @@ -18,14 +63,6 @@ export interface ProposalDetails { state: ProposalAllOfStateEnum; } -export interface ProposalDTO { - id: string; - issuerId: string; - provider: { id: string; name: string }; - properties: object; - constraints: string; -} - /** * Proposal module - an object representing an offer in the state of a proposal from the provider. * @hidden @@ -34,7 +71,7 @@ export class Proposal { id: string; readonly issuerId: string; readonly provider: { id: string; name: string }; - readonly properties: object; + readonly properties: ProposalProperties; readonly constraints: string; readonly timestamp: string; counteringProposalId: string | null; @@ -63,13 +100,16 @@ export class Proposal { ) { this.id = model.proposalId; this.issuerId = model.issuerId; - this.properties = model.properties; + this.properties = model.properties as ProposalProperties; this.constraints = model.constraints; this.state = model.state; this.prevProposalId = model.prevProposalId; this.timestamp = model.timestamp; this.counteringProposalId = null; this.provider = { id: this.issuerId, name: this.details.providerName }; + + // Run validation to ensure that the Proposal is in a complete and correct state + this.validate(); } get details(): ProposalDetails { @@ -88,16 +128,55 @@ export class Proposal { state: this.state, }; } - get dto(): ProposalDTO { + + get pricing(): PricingInfo { + const usageVector = this.properties["golem.com.usage.vector"]; + const priceVector = this.properties["golem.com.pricing.model.linear.coeffs"]; + + const envIdx = usageVector.findIndex((ele) => ele === "golem.usage.duration_sec"); + const cpuIdx = usageVector.findIndex((ele) => ele === "golem.usage.cpu_sec"); + + const envSec = priceVector[envIdx] ?? 0.0; + const cpuSec = priceVector[cpuIdx] ?? 0.0; + const start = priceVector[priceVector.length - 1]; + return { - id: this.id, - issuerId: this.issuerId, - provider: this.provider, - properties: this.properties, - constraints: this.constraints, + cpuSec, + envSec, + start, }; } + /** + * Validates if the proposal satisfies basic business rules, is complete and thus safe to interact with + * + * Use this method before executing any important logic, to ensure that you're working with correct, complete data + */ + protected validate(): void | never { + const usageVector = this.properties["golem.com.usage.vector"]; + const priceVector = this.properties["golem.com.pricing.model.linear.coeffs"]; + + if (!usageVector || usageVector.length === 0) { + throw new Error("Broken proposal: the `golem.com.usage.vector` does not contain price information"); + } + + if (!priceVector || priceVector.length === 0) { + throw new Error( + "Broken proposal: the `golem.com.pricing.model.linear.coeffs` does not contain pricing information", + ); + } + + if (usageVector.length < priceVector.length - 1) { + throw new Error( + "Broken proposal: the `golem.com.usage.vector` has less pricing information than `golem.com.pricing.model.linear.coeffs`", + ); + } + + if (priceVector.length < usageVector.length) { + throw new Error("Broken proposal: the `golem.com.pricing.model.linear.coeffs` should contain 3 price values"); + } + } + isInitial(): boolean { return this.state === ProposalAllOfStateEnum.Initial; } diff --git a/src/market/service.ts b/src/market/service.ts index 06c46be04..fd998bd99 100644 --- a/src/market/service.ts +++ b/src/market/service.ts @@ -1,12 +1,12 @@ import { Logger, sleep } from "../utils"; import { Package } from "../package"; -import { Proposal, ProposalDTO } from "./proposal"; +import { Proposal } from "./proposal"; import { AgreementPoolService } from "../agreement"; import { Allocation } from "../payment"; import { Demand, DemandEvent, DemandEventType, DemandOptions } from "./demand"; import { MarketConfig } from "./config"; -export type ProposalFilter = (proposal: ProposalDTO) => Promise; +export type ProposalFilter = (proposal: Proposal) => Promise | boolean; export interface MarketOptions extends DemandOptions { /** A custom filter that checks every proposal coming from the market */ @@ -120,7 +120,7 @@ export class MarketService { } private async processDraftProposal(proposal: Proposal) { - this.agreementPoolService.addProposal(proposal); + await this.agreementPoolService.addProposal(proposal); this.logger?.debug( `Proposal has been confirmed with provider ${proposal.issuerId} and added to agreement pool (${proposal.id})`, ); diff --git a/src/market/strategy.ts b/src/market/strategy.ts index eff189156..b517a8c21 100644 --- a/src/market/strategy.ts +++ b/src/market/strategy.ts @@ -1,28 +1,49 @@ -import { ProposalDTO } from "./proposal"; +import { Proposal } from "./proposal"; /** Default Proposal filter that accept all proposal coming from the market */ export const acceptAllProposalFilter = () => async () => true; /** Proposal filter blocking every offer coming from a provider whose id is in the array */ -export const blackListProposalIdsFilter = (blackListIds: string[]) => async (proposal: ProposalDTO) => +export const blackListProposalIdsFilter = (blackListIds: string[]) => async (proposal: Proposal) => !blackListIds.includes(proposal.issuerId); /** Proposal filter blocking every offer coming from a provider whose name is in the array */ -export const blackListProposalNamesFilter = (blackListNames: string[]) => async (proposal: ProposalDTO) => +export const blackListProposalNamesFilter = (blackListNames: string[]) => async (proposal: Proposal) => !blackListNames.includes(proposal.provider.name); /** Proposal filter blocking every offer coming from a provider whose name match to the regexp */ -export const blackListProposalRegexpFilter = (regexp: RegExp) => async (proposal: ProposalDTO) => +export const blackListProposalRegexpFilter = (regexp: RegExp) => async (proposal: Proposal) => !proposal.provider.name.match(regexp); /** Proposal filter that only allows offers from a provider whose id is in the array */ -export const whiteListProposalIdsFilter = (whiteListIds: string[]) => async (proposal: ProposalDTO) => +export const whiteListProposalIdsFilter = (whiteListIds: string[]) => async (proposal: Proposal) => whiteListIds.includes(proposal.issuerId); /** Proposal filter that only allows offers from a provider whose name is in the array */ -export const whiteListProposalNamesFilter = (whiteListNames: string[]) => async (proposal: ProposalDTO) => +export const whiteListProposalNamesFilter = (whiteListNames: string[]) => async (proposal: Proposal) => whiteListNames.includes(proposal.provider.name); /** Proposal filter that only allows offers from a provider whose name match to the regexp */ -export const whiteListProposalRegexpFilter = (regexp: RegExp) => async (proposal: ProposalDTO) => +export const whiteListProposalRegexpFilter = (regexp: RegExp) => async (proposal: Proposal) => !!proposal.provider.name.match(regexp); + +type PriceLimits = { + start: number; + cpuPerSec: number; + envPerSec: number; +}; + +/** + * Proposal filter only allowing offers that do not exceed the defined usage + * + * @param priceLimits.start The maximum start price in GLM + * @param priceLimits.cpuPerSec The maximum price for CPU usage in GLM/s + * @param priceLimits.envPerSec The maximum price for the duration of the activity in GLM/s + */ +export const limitPriceFilter = (priceLimits: PriceLimits) => async (proposal: Proposal) => { + return ( + proposal.pricing.cpuSec < priceLimits.cpuPerSec && + proposal.pricing.envSec <= priceLimits.envPerSec && + proposal.pricing.start <= priceLimits.start + ); +}; diff --git a/src/script/command.ts b/src/script/command.ts index 9b7760ad6..397cd47f9 100644 --- a/src/script/command.ts +++ b/src/script/command.ts @@ -2,12 +2,12 @@ import { ExeScriptRequest } from "ya-ts-client/dist/ya-activity/src/models"; import { StorageProvider } from "../storage"; import { Result, ResultState } from "../activity"; -const EmptyErrorResult: Result = { - result: ResultState.ERROR, +const EmptyErrorResult = new Result({ + result: ResultState.Error, eventDate: new Date().toISOString(), index: -1, message: "No result due to error", -}; +}); /** * @hidden @@ -50,8 +50,8 @@ export class Command { * * @param result */ - async after(result?: Result): Promise> { - return (result ?? EmptyErrorResult) as Result; + async after(result?: Result): Promise> { + return result ?? EmptyErrorResult; } } @@ -213,18 +213,18 @@ export class DownloadData extends Transfer { async after(result: Result): Promise> { await this.storageProvider.release([this.args["to"]]); - if (result.result === ResultState.OK) { - return { + if (result.result === ResultState.Ok) { + return new Result({ ...result, data: this.combineChunks(), - }; + }); } - return { + return new Result({ ...result, - result: ResultState.ERROR, + result: ResultState.Error, data: undefined, - }; + }); } private combineChunks(): Uint8Array { diff --git a/src/script/script.ts b/src/script/script.ts index a52a0fe3c..e3f9df954 100644 --- a/src/script/script.ts +++ b/src/script/script.ts @@ -6,11 +6,11 @@ import { Result } from "../activity"; * @hidden */ export class Script { - static create(commands?: Command[]): Script { + static create(commands?: Command[]): Script { return new Script(commands); } - constructor(private commands: Command[] = []) {} + constructor(private commands: Command[] = []) {} add(command: Command) { this.commands.push(command); } @@ -18,7 +18,7 @@ export class Script { await Promise.all(this.commands.map((cmd) => cmd.before())); } - async after(results: Result[]): Promise[]> { + async after(results: Result[]): Promise { // Call after() for each command mapping its result. return Promise.all(this.commands.map((command, i) => command.after(results[i]))); } diff --git a/src/task/service.ts b/src/task/service.ts index b95ec497a..78096a245 100644 --- a/src/task/service.ts +++ b/src/task/service.ts @@ -3,7 +3,7 @@ import { TaskQueue } from "./queue"; import { WorkContext } from "./work"; import { Logger, sleep } from "../utils"; import { StorageProvider } from "../storage"; -import { AgreementPoolService } from "../agreement"; +import { Agreement, AgreementPoolService } from "../agreement"; import { PaymentService } from "../payment"; import { NetworkService } from "../network"; import { Activity, ActivityOptions } from "../activity"; @@ -73,15 +73,11 @@ export class TaskService { task.start(); this.logger?.debug(`Starting task. ID: ${task.id}, Data: ${task.getData()}`); ++this.activeTasksCount; + const agreement = await this.agreementPoolService.getAgreement(); - let activity; + const activity = await this.getOrCreateActivity(agreement); + try { - if (this.activities.has(agreement.id)) { - activity = this.activities.get(agreement.id); - } else { - activity = await Activity.create(agreement.id, this.options); - this.activities.set(agreement.id, activity); - } this.options.eventTarget?.dispatchEvent( new Events.TaskStarted({ id: task.id, @@ -91,11 +87,13 @@ export class TaskService { providerName: agreement.provider.name, }), ); + this.logger?.info( `Task ${task.id} sent to provider ${agreement.provider.name}.${ task.getData() ? " Data: " + task.getData() : "" }`, ); + this.paymentService.acceptDebitNotes(agreement.id); this.paymentService.acceptPayments(agreement); const initWorker = task.getInitWorker(); @@ -134,7 +132,7 @@ export class TaskService { this.options.eventTarget?.dispatchEvent( new Events.TaskRedone({ id: task.id, - activityId: activity?.id, + activityId: activity.id, agreementId: agreement.id, providerId: agreement.provider.id, providerName: agreement.provider.name, @@ -158,11 +156,22 @@ export class TaskService { ); throw new Error(`Task ${task.id} has been rejected! ${reason}`); } - await activity?.stop().catch((actError) => this.logger?.debug(actError)); + await activity.stop().catch((actError) => this.logger?.debug(actError)); this.activities.delete(agreement.id); } finally { --this.activeTasksCount; } await this.agreementPoolService.releaseAgreement(agreement.id, task.isDone()).catch((e) => this.logger?.debug(e)); } + + private async getOrCreateActivity(agreement: Agreement) { + const previous = this.activities.get(agreement.id); + if (previous) { + return previous; + } else { + const activity = await Activity.create(agreement.id, this.options); + this.activities.set(agreement.id, activity); + return activity; + } + } } diff --git a/src/task/work.spec.ts b/src/task/work.spec.ts index 5f65972e9..8bffaec17 100644 --- a/src/task/work.spec.ts +++ b/src/task/work.spec.ts @@ -119,7 +119,7 @@ describe("Work Context", () => { }); const result = await context.downloadJson("/golem/file.txt"); - expect(result.result).toEqual(ResultState.OK); + expect(result.result).toEqual(ResultState.Ok); expect(result.data).toEqual(json); }); }); @@ -152,15 +152,15 @@ describe("Work Context", () => { jest.spyOn(Script.prototype, "before").mockResolvedValue(undefined); activity.mockResults([ActivityMock.createResult({ stdout: "SUCCESS" })]); const result = await context["runOneCommand"](new Run("test")); - expect(result.result).toEqual(ResultState.OK); + expect(result.result).toEqual(ResultState.Ok); expect(result.stdout).toEqual("SUCCESS"); }); it("should handle error result", async () => { jest.spyOn(Script.prototype, "before").mockResolvedValue(undefined); - activity.mockResults([ActivityMock.createResult({ result: ResultState.ERROR, stdout: "FAILURE" })]); + activity.mockResults([ActivityMock.createResult({ result: ResultState.Error, stdout: "FAILURE" })]); const result = await context["runOneCommand"](new Run("test")); - expect(result.result).toEqual(ResultState.ERROR); + expect(result.result).toEqual(ResultState.Error); expect(result.stdout).toEqual("FAILURE"); await logger.expectToInclude("Task error on provider"); }); diff --git a/src/task/work.ts b/src/task/work.ts index 0a8fb04da..da53384a2 100644 --- a/src/task/work.ts +++ b/src/task/work.ts @@ -158,19 +158,19 @@ export class WorkContext { } // eslint-disable-next-line @typescript-eslint/no-explicit-any - async downloadJson(src: string, options?: CommandOptions): Promise> { + async downloadJson(src: string, options?: CommandOptions): Promise { const result = await this.downloadData(src, options); - if (result.result !== ResultState.OK) { - return { + if (result.result !== ResultState.Ok) { + return new Result({ ...result, data: undefined, - }; + }); } - return { + return new Result({ ...result, data: JSON.parse(new TextDecoder().decode(result.data)), - }; + }); } beginBatch() { @@ -217,13 +217,16 @@ export class WorkContext { // Process result. let allResults: Result[] = []; for await (const result of results) allResults.push(result); - allResults = (await script.after(allResults)) as Result[]; + allResults = await script.after(allResults); // Handle errors. const commandsErrors = allResults.filter((res) => res.result === "Error"); if (commandsErrors.length) { const errorMessage = commandsErrors - .map((err) => `Error: ${err.message}. Stdout: ${err.stdout?.trim()}. Stderr: ${err.stderr?.trim()}`) + .map( + (err) => + `Error: ${err.message}. Stdout: ${err.stdout?.toString().trim()}. Stderr: ${err.stderr?.toString().trim()}`, + ) .join(". "); this.logger?.warn(`Task error on provider ${this.provider?.name || "'unknown'"}. ${errorMessage}`); } diff --git a/src/utils/logger/logger.ts b/src/utils/logger/logger.ts index f6c455622..7d28030b6 100644 --- a/src/utils/logger/logger.ts +++ b/src/utils/logger/logger.ts @@ -1,9 +1,9 @@ export enum LogLevel { - debug = "debug", - info = "info", - warn = "warn", - error = "error", - log = "log", + Debug = "debug", + Info = "info", + Warn = "warn", + Error = "error", + Log = "log", } export interface Logger { level: string; diff --git a/tests/e2e/strategies.spec.ts b/tests/e2e/strategies.spec.ts index dead03f99..ab1e7141d 100644 --- a/tests/e2e/strategies.spec.ts +++ b/tests/e2e/strategies.spec.ts @@ -17,7 +17,7 @@ describe("Strategies", function () { const data = ["one", "two", "three"]; const results = executor.map(data, async (ctx, x) => { const res = await ctx.run(`echo "${x}"`); - return res.stdout?.trim(); + return res.stdout?.toString().trim(); }); const finalOutputs: string[] = []; for await (const res of results) if (res) finalOutputs.push(res); @@ -38,7 +38,7 @@ describe("Strategies", function () { const data = ["one", "two", "three"]; const results = executor.map(data, async (ctx, x) => { const res = await ctx.run(`echo "${x}"`); - return res.stdout?.trim(); + return res.stdout?.toString().trim(); }); const finalOutputs: string[] = []; for await (const res of results) if (res) finalOutputs.push(res); @@ -60,7 +60,7 @@ describe("Strategies", function () { const data = ["one", "two"]; const results = executor.map(data, async (ctx, x) => { const res = await ctx.run(`echo "${x}"`); - return res.stdout?.trim(); + return res.stdout?.toString().trim(); }); const finalOutputs: string[] = []; for await (const res of results) if (res) finalOutputs.push(res); @@ -78,7 +78,7 @@ describe("Strategies", function () { const data = ["one", "two"]; const results = executor.map(data, async (ctx, x) => { const res = await ctx.run(`echo "${x}"`); - return res.stdout?.trim(); + return res.stdout?.toString().trim(); }); const finalOutputs: string[] = []; for await (const res of results) if (res) finalOutputs.push(res); diff --git a/tests/e2e/tasks.spec.ts b/tests/e2e/tasks.spec.ts index 7d7106a5e..0b89e6520 100644 --- a/tests/e2e/tasks.spec.ts +++ b/tests/e2e/tasks.spec.ts @@ -68,7 +68,7 @@ describe("Task Executor", function () { const data = ["one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten"]; const results = executor.map(data, async (ctx, x) => { const res = await ctx.run(`echo "${x}"`); - return res.stdout?.trim(); + return res.stdout?.toString().trim(); }); const finalOutputs: string[] = []; for await (const res of results) if (res) finalOutputs.push(res); @@ -83,7 +83,7 @@ describe("Task Executor", function () { const data = ["one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten"]; await executor.forEach(data, async (ctx, x) => { const res = await ctx.run(`echo "${x}"`); - expect(data).toContain(res?.stdout?.trim()); + expect(data).toContain(res?.stdout?.toString().trim()); }); }); @@ -102,7 +102,7 @@ describe("Task Executor", function () { .run('echo "Hello World"') .run('echo "OK"') .endStream(); - results.on("data", ({ stdout }) => outputs.push(stdout?.trim())); + results.on("data", ({ stdout }) => outputs.push(stdout.toString().trim())); results.on("close", () => (onEnd = "END")); }) .catch((e) => { @@ -130,7 +130,7 @@ describe("Task Executor", function () { .run('echo "Hello World"') .run('echo "OK"') .end(); - results.map((r) => outputs.push(r?.stdout?.trim() ?? "Missing STDOUT!")); + results.map((r) => outputs.push(r?.stdout?.toString().trim() ?? "Missing STDOUT!")); }) .catch((e) => { expect(e).toBeUndefined(); diff --git a/tests/e2e/yacat.spec.ts b/tests/e2e/yacat.spec.ts index 846ad98af..cce71220c 100644 --- a/tests/e2e/yacat.spec.ts +++ b/tests/e2e/yacat.spec.ts @@ -23,7 +23,7 @@ describe("Password cracking", function () { }); const keyspace = await executor.run(async (ctx) => { const result = await ctx.run(`hashcat --keyspace -a 3 ${mask} -m 400`); - return parseInt(result.stdout || ""); + return parseInt(result.stdout?.toString() || ""); }); expect(keyspace).toEqual(95); if (!keyspace) return; @@ -36,7 +36,7 @@ describe("Password cracking", function () { .run("cat pass.potfile") .end(); if (!results?.[1]?.stdout) return false; - return results?.[1]?.stdout.split(":")?.[1]?.trim(); + return results?.[1]?.stdout.toString().split(":")?.[1]?.trim(); }); let password = ""; for await (const result of results) { diff --git a/tests/mock/activity.mock.ts b/tests/mock/activity.mock.ts index 51f1c243b..94498d64a 100644 --- a/tests/mock/activity.mock.ts +++ b/tests/mock/activity.mock.ts @@ -9,12 +9,12 @@ export class ActivityMock extends Activity { private results: (Result | Error)[] = []; static createResult(props?: Partial): Result { - return { - result: ResultState.OK, + return new Result({ + result: ResultState.Ok, index: 1, eventDate: new Date().toISOString(), ...props, - }; + }); } constructor(id?: string, agreementId?: string, options?: ActivityConfig) { diff --git a/tests/unit/agreement_pool_service.test.ts b/tests/unit/agreement_pool_service.test.ts index f72c3a925..221a54de2 100644 --- a/tests/unit/agreement_pool_service.test.ts +++ b/tests/unit/agreement_pool_service.test.ts @@ -1,35 +1,44 @@ +jest.mock("ya-ts-client/dist/ya-market/api"); + import { LoggerMock } from "../mock"; import { Agreement, AgreementPoolService } from "../../src/agreement"; import { RequestorApi } from "ya-ts-client/dist/ya-market/api"; import { Proposal as ProposalModel } from "ya-ts-client/dist/ya-market/src/models/proposal"; -import { DemandOfferBase } from "ya-ts-client/dist/ya-market"; +import { ProposalAllOfStateEnum } from "ya-ts-client/dist/ya-market"; import { Proposal } from "../../src/market/proposal"; const logger = new LoggerMock(); +const mockAPI = new RequestorApi(); +const mockSetCounteringProposalReference = jest.fn(); const createProposal = (id) => { - return new Proposal( - id, - null, - () => {}, - {} as RequestorApi, - { - properties: { - "golem.activity.caps.transfer.protocol": "protocol", - "golem.inf.cpu.brand": "cpu_brand", - "golem.inf.cpu.capabilities": "cpu_capabilities", - "golem.inf.cpu.cores": "cpu_cores", - "golem.inf.cpu.threads": "cpu_threads", - "golem.inf.mem.gib": "mem_gib", - "golem.inf.storage.gib": "storage_gib", - "golem.node.id.name": "node_id_name", - "golem.node.net.is-public": true, - "golem.runtime.capabilities": ["a", "b", "c"], - "golem.runtime.name": "runtime_name", - }, - } as ProposalModel, - {} as DemandOfferBase, - ); + const model: ProposalModel = { + constraints: "", + issuerId: "", + proposalId: "", + state: ProposalAllOfStateEnum.Initial, + timestamp: "", + properties: { + "golem.activity.caps.transfer.protocol": "protocol", + "golem.inf.cpu.brand": "cpu_brand", + "golem.inf.cpu.capabilities": "cpu_capabilities", + "golem.inf.cpu.cores": "cpu_cores", + "golem.inf.cpu.threads": "cpu_threads", + "golem.inf.mem.gib": "mem_gib", + "golem.inf.storage.gib": "storage_gib", + "golem.node.id.name": "node_id_name", + "golem.node.net.is-public": true, + "golem.runtime.capabilities": ["a", "b", "c"], + "golem.runtime.name": "runtime_name", + "golem.com.usage.vector": ["golem.usage.duration_sec", "golem.usage.cpu_sec"], + "golem.com.pricing.model.linear.coeffs": [0.1, 0.2, 0.0], + }, + }; + + return new Proposal(id, null, mockSetCounteringProposalReference, mockAPI, model, { + constraints: "", + properties: {}, + }); }; describe("Agreement Pool Service", () => {