From f2e6a87403e3713dbb337146c9d876ec45ceac55 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 22 Jul 2024 08:33:53 +0000 Subject: [PATCH 01/14] build(deps): bump the prod-deps-regular group with 4 updates Bumps the prod-deps-regular group with 4 updates: [semver](https://github.com/npm/node-semver), [@rollup/rollup-darwin-x64](https://github.com/rollup/rollup), [@rollup/rollup-win32-arm64-msvc](https://github.com/rollup/rollup) and [@rollup/rollup-win32-x64-msvc](https://github.com/rollup/rollup). Updates `semver` from 7.6.2 to 7.6.3 - [Release notes](https://github.com/npm/node-semver/releases) - [Changelog](https://github.com/npm/node-semver/blob/main/CHANGELOG.md) - [Commits](https://github.com/npm/node-semver/compare/v7.6.2...v7.6.3) Updates `@rollup/rollup-darwin-x64` from 4.18.1 to 4.19.0 - [Release notes](https://github.com/rollup/rollup/releases) - [Changelog](https://github.com/rollup/rollup/blob/master/CHANGELOG.md) - [Commits](https://github.com/rollup/rollup/compare/v4.18.1...v4.19.0) Updates `@rollup/rollup-win32-arm64-msvc` from 4.18.1 to 4.19.0 - [Release notes](https://github.com/rollup/rollup/releases) - [Changelog](https://github.com/rollup/rollup/blob/master/CHANGELOG.md) - [Commits](https://github.com/rollup/rollup/compare/v4.18.1...v4.19.0) Updates `@rollup/rollup-win32-x64-msvc` from 4.18.1 to 4.19.0 - [Release notes](https://github.com/rollup/rollup/releases) - [Changelog](https://github.com/rollup/rollup/blob/master/CHANGELOG.md) - [Commits](https://github.com/rollup/rollup/compare/v4.18.1...v4.19.0) --- updated-dependencies: - dependency-name: semver dependency-type: direct:production update-type: version-update:semver-patch dependency-group: prod-deps-regular - dependency-name: "@rollup/rollup-darwin-x64" dependency-type: direct:production update-type: version-update:semver-minor dependency-group: prod-deps-regular - dependency-name: "@rollup/rollup-win32-arm64-msvc" dependency-type: direct:production update-type: version-update:semver-minor dependency-group: prod-deps-regular - dependency-name: "@rollup/rollup-win32-x64-msvc" dependency-type: direct:production update-type: version-update:semver-minor dependency-group: prod-deps-regular ... Signed-off-by: dependabot[bot] --- package-lock.json | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/package-lock.json b/package-lock.json index c812c221d..d36f99776 100644 --- a/package-lock.json +++ b/package-lock.json @@ -13,6 +13,7 @@ ], "dependencies": { "@golem-sdk/pino-logger": "^1.1.0", + "@rollup/rollup-win32-x64-msvc": "^4", "async-lock": "^1.4.1", "async-retry": "^1.3.3", "axios": "^1.6.7", @@ -2965,9 +2966,9 @@ ] }, "node_modules/@rollup/rollup-darwin-x64": { - "version": "4.18.1", - "resolved": "https://registry.npmjs.org/@rollup/rollup-darwin-x64/-/rollup-darwin-x64-4.18.1.tgz", - "integrity": "sha512-IgpzXKauRe1Tafcej9STjSSuG0Ghu/xGYH+qG6JwsAUxXrnkvNHcq/NL6nz1+jzvWAnQkuAJ4uIwGB48K9OCGA==", + "version": "4.19.0", + "resolved": "https://registry.npmjs.org/@rollup/rollup-darwin-x64/-/rollup-darwin-x64-4.19.0.tgz", + "integrity": "sha512-fO28cWA1dC57qCd+D0rfLC4VPbh6EOJXrreBmFLWPGI9dpMlER2YwSPZzSGfq11XgcEpPukPTfEVFtw2q2nYJg==", "cpu": [ "x64" ], @@ -3081,9 +3082,9 @@ ] }, "node_modules/@rollup/rollup-win32-arm64-msvc": { - "version": "4.18.1", - "resolved": "https://registry.npmjs.org/@rollup/rollup-win32-arm64-msvc/-/rollup-win32-arm64-msvc-4.18.1.tgz", - "integrity": "sha512-W2ZNI323O/8pJdBGil1oCauuCzmVd9lDmWBBqxYZcOqWD6aWqJtVBQ1dFrF4dYpZPks6F+xCZHfzG5hYlSHZ6g==", + "version": "4.19.0", + "resolved": "https://registry.npmjs.org/@rollup/rollup-win32-arm64-msvc/-/rollup-win32-arm64-msvc-4.19.0.tgz", + "integrity": "sha512-HxDMKIhmcguGTiP5TsLNolwBUK3nGGUEoV/BO9ldUBoMLBssvh4J0X8pf11i1fTV7WShWItB1bKAKjX4RQeYmg==", "cpu": [ "arm64" ], @@ -3106,9 +3107,9 @@ ] }, "node_modules/@rollup/rollup-win32-x64-msvc": { - "version": "4.18.1", - "resolved": "https://registry.npmjs.org/@rollup/rollup-win32-x64-msvc/-/rollup-win32-x64-msvc-4.18.1.tgz", - "integrity": "sha512-yjk2MAkQmoaPYCSu35RLJ62+dz358nE83VfTePJRp8CG7aMg25mEJYpXFiD+NcevhX8LxD5OP5tktPXnXN7GDw==", + "version": "4.19.0", + "resolved": "https://registry.npmjs.org/@rollup/rollup-win32-x64-msvc/-/rollup-win32-x64-msvc-4.19.0.tgz", + "integrity": "sha512-xNo5fV5ycvCCKqiZcpB65VMR11NJB+StnxHz20jdqRAktfdfzhgjTiJ2doTDQE/7dqGaV5I7ZGqKpgph6lCIag==", "cpu": [ "x64" ], @@ -16496,9 +16497,9 @@ } }, "node_modules/semver": { - "version": "7.6.2", - "resolved": "https://registry.npmjs.org/semver/-/semver-7.6.2.tgz", - "integrity": "sha512-FNAIBWCx9qcRhoHcgcJ0gvU7SN1lYU2ZXuSfl04bSC5OpvDHFyJCjdNHomPXxjQlCBU67YW64PzY7/VIEH7F2w==", + "version": "7.6.3", + "resolved": "https://registry.npmjs.org/semver/-/semver-7.6.3.tgz", + "integrity": "sha512-oVekP1cKtI+CTDvHWYFUcMtsK/00wmAEfyqKfNdARm8u1wNVhSgaX7A8d4UuIlUI5e84iEwOhs7ZPYRmzU9U6A==", "bin": { "semver": "bin/semver.js" }, From 1dc806ff106969ecb51ba9b328d4aff1ecf7de71 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 29 Jul 2024 09:00:01 +0000 Subject: [PATCH 02/14] build(deps): bump the prod-deps-regular group with 4 updates Bumps the prod-deps-regular group with 4 updates: [debug](https://github.com/debug-js/debug), [@rollup/rollup-darwin-x64](https://github.com/rollup/rollup), [@rollup/rollup-win32-arm64-msvc](https://github.com/rollup/rollup) and [@rollup/rollup-win32-x64-msvc](https://github.com/rollup/rollup). Updates `debug` from 4.3.5 to 4.3.6 - [Release notes](https://github.com/debug-js/debug/releases) - [Commits](https://github.com/debug-js/debug/compare/4.3.5...4.3.6) Updates `@rollup/rollup-darwin-x64` from 4.19.0 to 4.19.1 - [Release notes](https://github.com/rollup/rollup/releases) - [Changelog](https://github.com/rollup/rollup/blob/master/CHANGELOG.md) - [Commits](https://github.com/rollup/rollup/compare/v4.19.0...v4.19.1) Updates `@rollup/rollup-win32-arm64-msvc` from 4.19.0 to 4.19.1 - [Release notes](https://github.com/rollup/rollup/releases) - [Changelog](https://github.com/rollup/rollup/blob/master/CHANGELOG.md) - [Commits](https://github.com/rollup/rollup/compare/v4.19.0...v4.19.1) Updates `@rollup/rollup-win32-x64-msvc` from 4.19.0 to 4.19.1 - [Release notes](https://github.com/rollup/rollup/releases) - [Changelog](https://github.com/rollup/rollup/blob/master/CHANGELOG.md) - [Commits](https://github.com/rollup/rollup/compare/v4.19.0...v4.19.1) --- updated-dependencies: - dependency-name: debug dependency-type: direct:production update-type: version-update:semver-patch dependency-group: prod-deps-regular - dependency-name: "@rollup/rollup-darwin-x64" dependency-type: direct:production update-type: version-update:semver-patch dependency-group: prod-deps-regular - dependency-name: "@rollup/rollup-win32-arm64-msvc" dependency-type: direct:production update-type: version-update:semver-patch dependency-group: prod-deps-regular - dependency-name: "@rollup/rollup-win32-x64-msvc" dependency-type: direct:production update-type: version-update:semver-patch dependency-group: prod-deps-regular ... Signed-off-by: dependabot[bot] --- package-lock.json | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/package-lock.json b/package-lock.json index d36f99776..aca67db04 100644 --- a/package-lock.json +++ b/package-lock.json @@ -2966,9 +2966,9 @@ ] }, "node_modules/@rollup/rollup-darwin-x64": { - "version": "4.19.0", - "resolved": "https://registry.npmjs.org/@rollup/rollup-darwin-x64/-/rollup-darwin-x64-4.19.0.tgz", - "integrity": "sha512-fO28cWA1dC57qCd+D0rfLC4VPbh6EOJXrreBmFLWPGI9dpMlER2YwSPZzSGfq11XgcEpPukPTfEVFtw2q2nYJg==", + "version": "4.19.1", + "resolved": "https://registry.npmjs.org/@rollup/rollup-darwin-x64/-/rollup-darwin-x64-4.19.1.tgz", + "integrity": "sha512-4T42heKsnbjkn7ovYiAdDVRRWZLU9Kmhdt6HafZxFcUdpjlBlxj4wDrt1yFWLk7G4+E+8p2C9tcmSu0KA6auGA==", "cpu": [ "x64" ], @@ -3082,9 +3082,9 @@ ] }, "node_modules/@rollup/rollup-win32-arm64-msvc": { - "version": "4.19.0", - "resolved": "https://registry.npmjs.org/@rollup/rollup-win32-arm64-msvc/-/rollup-win32-arm64-msvc-4.19.0.tgz", - "integrity": "sha512-HxDMKIhmcguGTiP5TsLNolwBUK3nGGUEoV/BO9ldUBoMLBssvh4J0X8pf11i1fTV7WShWItB1bKAKjX4RQeYmg==", + "version": "4.19.1", + "resolved": "https://registry.npmjs.org/@rollup/rollup-win32-arm64-msvc/-/rollup-win32-arm64-msvc-4.19.1.tgz", + "integrity": "sha512-88brja2vldW/76jWATlBqHEoGjJLRnP0WOEKAUbMcXaAZnemNhlAHSyj4jIwMoP2T750LE9lblvD4e2jXleZsA==", "cpu": [ "arm64" ], @@ -3107,9 +3107,9 @@ ] }, "node_modules/@rollup/rollup-win32-x64-msvc": { - "version": "4.19.0", - "resolved": "https://registry.npmjs.org/@rollup/rollup-win32-x64-msvc/-/rollup-win32-x64-msvc-4.19.0.tgz", - "integrity": "sha512-xNo5fV5ycvCCKqiZcpB65VMR11NJB+StnxHz20jdqRAktfdfzhgjTiJ2doTDQE/7dqGaV5I7ZGqKpgph6lCIag==", + "version": "4.19.1", + "resolved": "https://registry.npmjs.org/@rollup/rollup-win32-x64-msvc/-/rollup-win32-x64-msvc-4.19.1.tgz", + "integrity": "sha512-2bIrL28PcK3YCqD9anGxDxamxdiJAxA+l7fWIwM5o8UqNy1t3d1NdAweO2XhA0KTDJ5aH1FsuiT5+7VhtHliXg==", "cpu": [ "x64" ], @@ -6578,9 +6578,9 @@ "license": "MIT" }, "node_modules/debug": { - "version": "4.3.5", - "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.5.tgz", - "integrity": "sha512-pt0bNEmneDIvdL1Xsd9oDQ/wrQRkXDT4AUWlNZNPKvW5x/jyO9VFXkJUP07vQ2upmw5PlaITaPKc31jK13V+jg==", + "version": "4.3.6", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.6.tgz", + "integrity": "sha512-O/09Bd4Z1fBrU4VzkhFqVgpPzaGbw6Sm9FEkBT1A/YBXQFGuuSxa1dN2nxgxS34JmKXqYx8CZAwEVoJFImUXIg==", "dependencies": { "ms": "2.1.2" }, From c34d40ca9d0fdedb721600ae00ff4fa408e1de15 Mon Sep 17 00:00:00 2001 From: Grzegorz Godlewski Date: Wed, 31 Jul 2024 11:36:37 +0200 Subject: [PATCH 03/14] refactor: rename scanned-proposal to scanned-offer to match class name --- src/market/scan/index.ts | 2 +- .../scan/{scanned-proposal.ts => scanned-offer.ts} | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) rename src/market/scan/{scanned-proposal.ts => scanned-offer.ts} (99%) diff --git a/src/market/scan/index.ts b/src/market/scan/index.ts index a6e78b177..852270e48 100644 --- a/src/market/scan/index.ts +++ b/src/market/scan/index.ts @@ -1,3 +1,3 @@ export * from "./types"; export * from "./scan-director"; -export * from "./scanned-proposal"; +export * from "./scanned-offer"; diff --git a/src/market/scan/scanned-proposal.ts b/src/market/scan/scanned-offer.ts similarity index 99% rename from src/market/scan/scanned-proposal.ts rename to src/market/scan/scanned-offer.ts index c0797b28c..fb75e7c9e 100644 --- a/src/market/scan/scanned-proposal.ts +++ b/src/market/scan/scanned-offer.ts @@ -51,33 +51,43 @@ export class ScannedOffer { name: this.properties["golem.node.id.name"] || "", }; } + get transferProtocol() { return this.properties["golem.activity.caps.transfer.protocol"]; } + get cpuBrand() { return this.properties["golem.inf.cpu.brand"]; } + get cpuCapabilities() { return this.properties["golem.inf.cpu.capabilities"]; } + get cpuCores() { return this.properties["golem.inf.cpu.cores"]; } + get cpuThreads() { return this.properties["golem.inf.cpu.threads"]; } + get memory() { return this.properties["golem.inf.mem.gib"]; } + get storage() { return this.properties["golem.inf.storage.gib"]; } + get publicNet() { return this.properties["golem.node.net.is-public"]; } + get runtimeCapabilities() { return this.properties["golem.runtime.capabilities"]; } + get runtimeName() { return this.properties["golem.runtime.name"]; } From c3682bd07a42ef3a56403a31a84c13e094f9d7fb Mon Sep 17 00:00:00 2001 From: Grzegorz Godlewski Date: Wed, 31 Jul 2024 11:41:20 +0200 Subject: [PATCH 04/14] fix(market): exposed memoryGib and storageGib from ScannedOffer, deprecating old methods --- src/market/scan/scanned-offer.ts | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/market/scan/scanned-offer.ts b/src/market/scan/scanned-offer.ts index fb75e7c9e..56a581d3e 100644 --- a/src/market/scan/scanned-offer.ts +++ b/src/market/scan/scanned-offer.ts @@ -72,11 +72,21 @@ export class ScannedOffer { return this.properties["golem.inf.cpu.threads"]; } + /** @deprecated Use {@link memoryGib} instead */ get memory() { + return this.memoryGib; + } + + get memoryGib() { return this.properties["golem.inf.mem.gib"]; } + /** @deprecated Use {@link storageGib} instead */ get storage() { + return this.storageGib; + } + + get storageGib() { return this.properties["golem.inf.storage.gib"]; } From 94163595b09d86576ced9d040b9de4d00384ded0 Mon Sep 17 00:00:00 2001 From: Grzegorz Godlewski Date: Wed, 31 Jul 2024 11:43:20 +0200 Subject: [PATCH 05/14] feat(market): added offerId to ScannedOffer --- src/market/scan/scanned-offer.ts | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/market/scan/scanned-offer.ts b/src/market/scan/scanned-offer.ts index 56a581d3e..50fd63dee 100644 --- a/src/market/scan/scanned-offer.ts +++ b/src/market/scan/scanned-offer.ts @@ -101,4 +101,15 @@ export class ScannedOffer { get runtimeName() { return this.properties["golem.runtime.name"]; } + + /** + * Get the ID of the offer published by the Provider + * + * Note: + * - this ID will change after the provider refreshes the offer (usually after 1h) + * - this ID will remain unchanged for the same published offer between different scans + */ + get offerId() { + return this.model.offerId; + } } From 56dd57521d5c1c5f79a1e39833140233ead9e277 Mon Sep 17 00:00:00 2001 From: Grzegorz Godlewski Date: Wed, 31 Jul 2024 12:28:54 +0200 Subject: [PATCH 06/14] feat(market): add GPU support properties and payment platform addresses Introduced GAP-35 GPU capability properties to the proposal properties. Added cpuVendor, gpuBrand, offerCreateAt, and paymentPlatformAddresses getters to the ScannedOffer class. --- src/market/proposal/proposal-properties.ts | 27 +++++++++++-- src/market/scan/scanned-offer.ts | 44 ++++++++++++++++++++++ 2 files changed, 67 insertions(+), 4 deletions(-) diff --git a/src/market/proposal/proposal-properties.ts b/src/market/proposal/proposal-properties.ts index 9503f9830..837c60686 100644 --- a/src/market/proposal/proposal-properties.ts +++ b/src/market/proposal/proposal-properties.ts @@ -20,10 +20,28 @@ export type GenericGolemProtocolPropertyType = Record; + +/** + * Properties defined by GAP-35 + * + * @link https://github.com/golemfactory/golem-architecture/blob/master/gaps/gap-35_gpu_pci_capability/gap-35_gpu_pci_capability.md + */ +export type Gap35GpuSupportProps = Partial<{ + "golem.!exp.gap-35.v1.inf.gpu.model": string; + "golem.!exp.gap-35.v1.inf.gpu.clocks.graphics.mhz": number; + "golem.!exp.gap-35.v1.inf.gpu.clocks.memory.mhz": number; + "golem.!exp.gap-35.v1.inf.gpu.clocks.sm.mhz": number; + "golem.!exp.gap-35.v1.inf.gpu.clocks.video.mhz": number; + "golem.!exp.gap-35.v1.inf.gpu.cuda.cores": number; + "golem.!exp.gap-35.v1.inf.gpu.cuda.enabled": boolean; + "golem.!exp.gap-35.v1.inf.gpu.cuda.version": string; + "golem.!exp.gap-35.v1.inf.gpu.memory.bandwidth.gib": number; + "golem.!exp.gap-35.v1.inf.gpu.memory.total.gib": number; +}>; /** * @link https://github.com/golemfactory/golem-architecture/tree/master/standards/0-commons */ @@ -98,6 +116,7 @@ export type ProposalProperties = StandardComputationPlatformProps & // Attach GAP specific property sets Gap3MidAgreementPaymentProps & + Gap35GpuSupportProps & /** * These are around byt not really specified in any standard * FIXME #yagna - Standardize? diff --git a/src/market/scan/scanned-offer.ts b/src/market/scan/scanned-offer.ts index 50fd63dee..939cf2526 100644 --- a/src/market/scan/scanned-offer.ts +++ b/src/market/scan/scanned-offer.ts @@ -4,6 +4,11 @@ import type { MarketApi } from "ya-ts-client"; type ScannedOfferDTO = MarketApi.OfferDTO; +type PaymentPlatformAddressSet = { + /** The payment platform and address map */ + [paymentPlatform: string]: string | undefined; +}; + export class ScannedOffer { constructor(private readonly model: ScannedOfferDTO) {} @@ -60,6 +65,10 @@ export class ScannedOffer { return this.properties["golem.inf.cpu.brand"]; } + get cpuVendor() { + return this.properties["golem.inf.cpu.vendor"]; + } + get cpuCapabilities() { return this.properties["golem.inf.cpu.capabilities"]; } @@ -72,6 +81,10 @@ export class ScannedOffer { return this.properties["golem.inf.cpu.threads"]; } + get gpuBrand() { + return this.properties["golem.!exp.gap-35.v1.inf.gpu.model"]; + } + /** @deprecated Use {@link memoryGib} instead */ get memory() { return this.memoryGib; @@ -112,4 +125,35 @@ export class ScannedOffer { get offerId() { return this.model.offerId; } + + /** + * The timestamp at which the offer was generated by the Provider + */ + get offerCreateAt() { + return this.model.timestamp; + } + + /** + * Lists down payment addresses on different payment platforms + * + * @example Example return value + * ```json + * { + * "erc20-polygon-glm": "0x8737beea5668595fda9d50e85cae9cad10b4c980", + * "erc20-holesky-tglm:" "0x8737beea5668595fda9d50e85cae9cad10b4c980", + * } + * ``` + */ + get paymentPlatformAddresses(): PaymentPlatformAddressSet { + const platformProps = Object.entries(this.model.properties).filter(([key]) => + key.startsWith("golem.com.payment.platform."), + ); + + const platformAddress = platformProps.map(([key, address]) => [ + key.replace("golem.com.payment.platform.", "").replace(".address", ""), + address, + ]); + + return Object.fromEntries(platformAddress); + } } From 50633ef295ff86457f0856165d76ff540876d0b0 Mon Sep 17 00:00:00 2001 From: Grzegorz Godlewski Date: Wed, 31 Jul 2024 14:59:08 +0200 Subject: [PATCH 07/14] fix: fixed the issue blocking GolemNetwork.disconnecte because of missing gftp --- src/golem-network/golem-network.ts | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/golem-network/golem-network.ts b/src/golem-network/golem-network.ts index 6c00f5908..31974e1e7 100644 --- a/src/golem-network/golem-network.ts +++ b/src/golem-network/golem-network.ts @@ -356,15 +356,14 @@ export class GolemNetwork { * @return Resolves when all shutdown steps are completed */ async disconnect() { - if (!this.isConnected()) { - return; - } if (this.disconnectPromise) { return this.disconnectPromise; } + this.disconnectPromise = this.startDisconnect().finally(() => { this.disconnectPromise = undefined; }); + return this.disconnectPromise; } From 37be0d3470c0c4af7234e441278fae53a6a83dc2 Mon Sep 17 00:00:00 2001 From: Grzegorz Godlewski Date: Wed, 31 Jul 2024 16:50:30 +0200 Subject: [PATCH 08/14] test: applied review remark and added a test --- src/market/scan/scanned-offer.test.ts | 22 ++++++++++++++++++++++ src/market/scan/scanned-offer.ts | 17 +++++++++-------- 2 files changed, 31 insertions(+), 8 deletions(-) create mode 100644 src/market/scan/scanned-offer.test.ts diff --git a/src/market/scan/scanned-offer.test.ts b/src/market/scan/scanned-offer.test.ts new file mode 100644 index 000000000..946cfa9d8 --- /dev/null +++ b/src/market/scan/scanned-offer.test.ts @@ -0,0 +1,22 @@ +import { ScannedOffer } from "./scanned-offer"; + +describe("Scanned Offer", () => { + test("Returns payment platform address information", async () => { + const offer = new ScannedOffer({ + offerId: "example-id", + properties: { + "golem.com.payment.platform.erc20-polygon-glm.address": "0xPolygonAddress", + "golem.com.payment.platform.erc20-holesky-tglm.address": "0xHoleskyAddress", + "golem.com.payment.platform.nonsense": "0xNonsense", + "some.other.prop": "with-a-value", + }, + timestamp: new Date().toISOString(), + providerId: "provider-id", + constraints: "", + }); + + expect(offer.paymentPlatformAddresses["erc20-polygon-glm"]).toEqual("0xPolygonAddress"); + expect(offer.paymentPlatformAddresses["erc20-holesky-tglm"]).toEqual("0xHoleskyAddress"); + expect(Object.entries(offer.paymentPlatformAddresses).length).toEqual(2); + }); +}); diff --git a/src/market/scan/scanned-offer.ts b/src/market/scan/scanned-offer.ts index 939cf2526..4388d889a 100644 --- a/src/market/scan/scanned-offer.ts +++ b/src/market/scan/scanned-offer.ts @@ -145,14 +145,15 @@ export class ScannedOffer { * ``` */ get paymentPlatformAddresses(): PaymentPlatformAddressSet { - const platformProps = Object.entries(this.model.properties).filter(([key]) => - key.startsWith("golem.com.payment.platform."), - ); - - const platformAddress = platformProps.map(([key, address]) => [ - key.replace("golem.com.payment.platform.", "").replace(".address", ""), - address, - ]); + const propRegex = /golem.com.payment.platform.([a-z0-9-]+).address/; + const platformProps = Object.entries(this.model.properties).filter(([key]) => key.match(propRegex)); + + const platformAddress = platformProps + .map<[string, string]>(([key, address]) => { + const match = key.match(propRegex); + return [match ? match[1] : "", address]; + }) + .filter(([key]) => !!key); return Object.fromEntries(platformAddress); } From 8aabace13fc956ed5c0b695a70365caeb821718a Mon Sep 17 00:00:00 2001 From: Grzegorz Godlewski Date: Thu, 1 Aug 2024 09:16:28 +0200 Subject: [PATCH 09/14] fix: fixed TcpProxy implementation so that it works with non-http use-cases as well --- .../rental-model/advanced/tcp-proxy/server.js | 39 ++++ .../advanced/tcp-proxy/tcp-proxy.ts | 86 ++++++++ src/market/market.module.test.ts | 4 +- src/network/tcpProxy.ts | 201 ++++++++++++++---- src/resource-rental/resource-rental.ts | 4 +- src/shared/utils/index.ts | 1 + src/shared/utils/wait.ts | 2 +- src/shared/yagna/event-reader.ts | 4 +- 8 files changed, 289 insertions(+), 52 deletions(-) create mode 100644 examples/rental-model/advanced/tcp-proxy/server.js create mode 100644 examples/rental-model/advanced/tcp-proxy/tcp-proxy.ts diff --git a/examples/rental-model/advanced/tcp-proxy/server.js b/examples/rental-model/advanced/tcp-proxy/server.js new file mode 100644 index 000000000..a1cb57ab1 --- /dev/null +++ b/examples/rental-model/advanced/tcp-proxy/server.js @@ -0,0 +1,39 @@ +/* eslint-disable */ +const http = require("http"); + +(async function main() { + const PORT = parseInt(process.env["PORT"] ?? "80"); + + // Increase the value if you want to test long response/liveliness scenarios + const SIMULATE_DELAY_SEC = parseInt(process.env["SIMULATE_DELAY_SEC"] ?? "0"); + + const respond = (res) => { + res.writeHead(200); + res.end("Hello Golem!"); + }; + + const app = http.createServer((req, res) => { + if (SIMULATE_DELAY_SEC > 0) { + setTimeout(() => { + respond(res); + }, SIMULATE_DELAY_SEC * 1000); + } else { + respond(res); + } + }); + + const server = app.listen(PORT, () => console.log(`HTTP server started at "http://localhost:${PORT}"`)); + + const shutdown = () => { + server.close((err) => { + if (err) { + console.error("Server close encountered an issue", err); + } else { + console.log("Server closed successfully"); + } + }); + }; + + process.on("SIGINT", shutdown); + process.on("SIGTERM", shutdown); +})(); diff --git a/examples/rental-model/advanced/tcp-proxy/tcp-proxy.ts b/examples/rental-model/advanced/tcp-proxy/tcp-proxy.ts new file mode 100644 index 000000000..da81e3a8c --- /dev/null +++ b/examples/rental-model/advanced/tcp-proxy/tcp-proxy.ts @@ -0,0 +1,86 @@ +import { GolemNetwork, waitFor } from "@golem-sdk/golem-js"; +import { pinoPrettyLogger } from "@golem-sdk/pino-logger"; + +(async () => { + const logger = pinoPrettyLogger({ + level: "info", + }); + const glm = new GolemNetwork({ + logger, + }); + + try { + await glm.connect(); + + const network = await glm.createNetwork({ + ip: "10.0.0.0/24", + }); + + const rental = await glm.oneOf({ + order: { + demand: { + workload: { + imageTag: "golem/node:20-alpine", + capabilities: ["vpn"], + }, + }, + market: { + rentHours: 0.25, + pricing: { + model: "burn-rate", + avgGlmPerHour: 1, + }, + }, + network, + }, + }); + + const PORT_ON_PROVIDER = 80; + const PORT_ON_REQUESTOR = 8080; + + const exe = await rental.getExeUnit(); + + // Install the server script + await exe.uploadFile(`./rental-model/advanced/tcp-proxy/server.js`, "/golem/work/server.js"); + + // Start the server process on the provider + const server = await exe.runAndStream(`PORT=${PORT_ON_PROVIDER} node /golem/work/server.js`); + + server.stdout.subscribe((data) => console.log("provider>", data)); + server.stderr.subscribe((data) => console.error("provider>", data)); + + // Create a proxy instance + const proxy = exe.createTcpProxy(PORT_ON_PROVIDER); + proxy.events.on("error", (error) => console.error("TcpProxy reported an error:", error)); + + // Start listening and expose the port on your requestor machine + await proxy.listen(PORT_ON_REQUESTOR); + console.log(`Server Proxy listen at http://localhost:${PORT_ON_REQUESTOR}`); + + let isClosing = false; + const stopServer = async () => { + if (isClosing) { + console.log("Already closing, ignoring subsequent shutdown request"); + return; + } + + isClosing = true; + + console.log("Shutting down gracefully"); + await proxy.close(); + }; + + process.on("SIGINT", () => { + stopServer() + .then(() => rental.stopAndFinalize()) + .then(() => logger.info("Shutdown routine completed")) + .catch((err) => logger.error("Failed to shutdown cleanly", err)); + }); + + await waitFor(() => server.isFinished()); + } catch (error) { + logger.error("Failed to run the example", error); + } finally { + await glm.disconnect(); + } +})().catch(console.error); diff --git a/src/market/market.module.test.ts b/src/market/market.module.test.ts index c91634fa0..33d24f47a 100644 --- a/src/market/market.module.test.ts +++ b/src/market/market.module.test.ts @@ -12,7 +12,7 @@ import { Allocation, IPaymentApi } from "../payment"; import { INetworkApi, NetworkModule } from "../network"; import { DraftOfferProposalPool } from "./draft-offer-proposal-pool"; import { Agreement, AgreementEvent, ProviderInfo } from "./agreement"; -import { waitAndCall, waitForCondition } from "../shared/utils/wait"; +import { waitAndCall, waitFor } from "../shared/utils/wait"; import { MarketOrderSpec } from "../golem-network"; import { GolemAbortError } from "../shared/error/golem-error"; @@ -347,7 +347,7 @@ describe("Market module", () => { }); }); - await waitForCondition(() => draftListener.mock.calls.length > 0); + await waitFor(() => draftListener.mock.calls.length > 0); testSub.unsubscribe(); expect(draftListener).toHaveBeenCalledWith(draftProposal); diff --git a/src/network/tcpProxy.ts b/src/network/tcpProxy.ts index ae2bd70d9..d08a0b210 100644 --- a/src/network/tcpProxy.ts +++ b/src/network/tcpProxy.ts @@ -2,6 +2,7 @@ import net from "net"; import { WebSocket } from "ws"; import { EventEmitter } from "eventemitter3"; import { defaultLogger, Logger } from "../shared/utils"; +import { Buffer } from "buffer"; export interface TcpProxyEvents { /** Raised when the proxy encounters any sort of error */ @@ -31,6 +32,16 @@ export interface TcpProxyOptions { * **IMPORTANT** * * This feature is supported only in the Node.js environment. In has no effect in browsers. + * + * General solution description: + * + * - [x] Open a TCP server and listen to connections + * - [x] When a new connection arrives, establish a WS connection with yagna + * - [ ] Pass any incoming data from the client TCP socket to the WS, buffer it when the socket is not ready yet + * - [ ] Pass any returning data from the WS to the client TCP socket, but don't do it if the client socket already disconnected + * - [ ] When the WS will be closed, then close the client socket as well + * - [ ] When the client TCP socket will be closed, close the WS as well + * - [ ] Handle teardown of the TCP-WS bridge by clearing communication buffers to avoid memory leaks */ export class TcpProxy { private server: net.Server; @@ -58,68 +69,166 @@ export class TcpProxy { this.heartBeatSec = options.heartBeatSec ?? 10; this.logger = options.logger ? options.logger.child("tcp-proxy") : defaultLogger("tcp-proxy"); - this.server = new net.Server({ keepAlive: true }, (socket: net.Socket) => { - this.logger.debug("TcpProxy Server new incoming connection"); + this.server = net.createServer((client: net.Socket) => { + this.logger.debug("Client connected to TCP Server"); + + const state = { + /** Tells if the client socket is in a usable state */ + sReady: true, + /** Buffer for chunks of data that arrived from yagna's WS and should be delivered to the client socket when it's ready */ + sBuffer: [] as Buffer[], + /** Tells if the WS with yagna is ready for communication */ + wsReady: false, + /** Buffer for chunks of data that arrived from the client socket and should be sent to yagna's WS when it's ready */ + wsBuffer: [] as Buffer[], + }; + + const clearSocketBuffer = () => (state.sBuffer = []); + const clearWebSocketBuffer = () => (state.wsBuffer = []); + + // UTILITY METHODS + const flushSocketBuffer = () => { + this.logger.debug("Flushing Socket buffer"); + if (state.sBuffer.length > 0) { + client.write(Buffer.concat(state.sBuffer)); + } + clearSocketBuffer(); + }; + + const flushWebSocketBuffer = () => { + this.logger.debug("Flushing WebSocket buffer"); + if (state.wsBuffer.length > 0) { + ws.send(Buffer.concat(state.wsBuffer), { + binary: true, + mask: true, + }); + } + clearWebSocketBuffer(); + }; + + const teardownBridge = () => { + ws.close(); + client.end(); + clearWebSocketBuffer(); + clearSocketBuffer(); + }; const ws = new WebSocket(this.wsUrl, { headers: { authorization: `Bearer ${this.appKey}` } }); + // OPEN HANDLERS ws.on("open", () => { - this.logger.debug("TcpProxy Yagna WS opened"); + this.logger.debug("Yagna WS opened"); + state.wsReady = true; + // Push any pending data to the web-socket + flushWebSocketBuffer(); + }); + + // NOTE: That's not really required in our use-case, added for completeness of the flow + client.on("connect", () => { + this.logger.debug("Client socket connected"); + state.sReady = true; + // Push any pending data to the client socket + flushSocketBuffer(); + }); - // Register the actual data transfer - socket.on("data", async (chunk) => ws.send(chunk.toString())); + // ERROR HANDLERS + ws.on("error", (error) => { + this.notifyOfError("Yagna WS encountered an error", error); + teardownBridge(); }); - ws.on("message", (message) => socket.write(message.toString())); + client.on("error", (error) => { + this.notifyOfError("Server Socket encountered an error", error); + teardownBridge(); + }); + + // TERMINATION HANDLERS + + // When the WS socket will be closed + ws.on("close", () => { + clearInterval(heartBeatInt); + this.logger.debug("Yagna WS closed"); + client.end(); + clearWebSocketBuffer(); + clearSocketBuffer(); + }); ws.on("end", () => { - this.logger.debug("TcpProxy Yagna WS end"); - socket.end(); + this.logger.debug("Yagna WS end"); + client.end(); + clearWebSocketBuffer(); + clearSocketBuffer(); }); - ws.on("error", (error) => { - this.handleError("TcpProxy Yagna WS encountered an error", error); + // When the client will disconnect + client.on("close", (error) => { + if (error) { + this.logger.error("Server Socket encountered closed with an error error"); + } else { + this.logger.debug("Server Socket has been closed (client disconnected)"); + } + ws.close(); + clearWebSocketBuffer(); + clearSocketBuffer(); + }); + + // DATA TRANSFER + // Send data to the WebSocket or buffer if it's not ready yet + client.on("data", async (chunk) => { + this.logger.debug("Server Socket received data", { length: chunk.length, wsReady: state.wsReady }); + if (!state.wsReady) { + state.wsBuffer.push(chunk); + } else { + ws.send(chunk, { binary: true, mask: true }); + } }); + // Send data to the client or buffer if it's not ready yet + ws.on("message", (message) => { + const length = "length" in message ? message.length : null; + this.logger.debug("Yagna WS received data", { length, socketReady: state.sReady }); + if (message instanceof Buffer) { + if (!state.sReady) { + state.wsBuffer.push(message); + } else { + client.write(message); + } + } else { + // Defensive programming + this.logger.error("Encountered unsupported type of message", typeof message); + } + }); + + // WS health monitoring ws.on("ping", () => { - this.logger.debug("TcpProxy Yagna WS received ping event"); + this.logger.debug("Yagna WS received ping event"); }); // Configure pings to check the health of the WS to Yagna let isAlive = true; const heartBeat = () => { - this.logger.debug("TcpProxy Yagna WS checking if the socket is alive"); - if (!isAlive) { - this.handleError("TcpProxy Yagna WS doesn't seem to be healthy, going to terminate"); - // Previous check failed, time to terminate - return ws.terminate(); - } + if (state.wsReady) { + this.logger.debug("Yagna WS checking if the client is alive"); + if (!isAlive) { + this.notifyOfError("Yagna WS doesn't seem to be healthy, going to terminate"); + // Previous check failed, time to terminate + return ws.terminate(); + } - isAlive = false; - ws.ping(); + isAlive = false; + ws.ping(); + } else { + this.logger.debug("Yagna WS is not ready yet, skipping heart beat"); + } }; const heartBeatInt = setInterval(heartBeat, this.heartBeatSec * 1000); ws.on("pong", () => { - this.logger.debug("TcpProxy Yagna WS received pong event"); + this.logger.debug("Yagna WS received pong event"); isAlive = true; }); - - ws.on("close", () => { - clearInterval(heartBeatInt); - this.logger.debug("TcpProxy Yagna WS was closed"); - }); - - socket.on("error", (error) => { - this.handleError("TcpProxy Server Socket encountered an error", error); - }); - - socket.on("close", () => { - this.logger.debug("TcpProxy Server Socket has been closed"); - ws.close(); - }); }); this.attachDebugLogsToServer(); @@ -141,7 +250,7 @@ export class TcpProxy { return new Promise((resolve, reject) => { const handleError = (err: unknown) => { - this.handleError("TcpProxy failed to start listening", { port, err }); + this.notifyOfError("TcpProxy failed to start listening", { port, err }); this.server.removeListener("listening", handleListen); reject(err); }; @@ -161,35 +270,37 @@ export class TcpProxy { * Gracefully close the proxy */ public close() { - this.logger.debug("TcpProxy close initiated"); + this.logger.debug("TCP Server close initiated by the user"); return new Promise((resolve, reject) => { if (this.server.listening) { this.server?.close((err) => { if (err) { - this.handleError("TcpProxy failed to close properly", err); + this.notifyOfError("TCP Server closed with an error", err); reject(err); } else { - this.logger.info("TcpProxy closed - was listening"); + this.logger.info("TCP server closed - was listening"); resolve(); } }); } else { - this.logger.info("TcpProxy closed - was not listening"); + this.logger.info("TCP Server closed - was not listening"); resolve(); } }); } - private handleError(message: string, err?: unknown) { + private notifyOfError(message: string, err?: unknown) { this.logger.error(message, err); this.events.emit("error", `${message}: ${err}`); } private attachDebugLogsToServer() { - this.server.on("listening", () => this.logger.debug("TcpProxy Server event 'listening'")); - this.server.on("close", () => this.logger.debug("TcpProxy Server event 'close'")); - this.server.on("connection", () => this.logger.debug("TcpProxy Server event 'connection'")); - this.server.on("drop", (data) => this.logger.debug("TcpProxy Server event 'drop'", { data })); - this.server.on("error", (err) => this.logger.debug("TcpProxy Server event 'error'", err)); + this.server.on("listening", () => this.logger.debug("TCP Server started to listen")); + this.server.on("close", () => this.logger.debug("TCP Server closed")); + this.server.on("connection", () => this.logger.debug("TCP Server received new connection")); + this.server.on("drop", (data) => + this.logger.debug("TCP Server dropped a connection because of reaching `maxConnections`", { data }), + ); + this.server.on("error", (err) => this.logger.error("Server event 'error'", err)); } } diff --git a/src/resource-rental/resource-rental.ts b/src/resource-rental/resource-rental.ts index 9dfb99bc9..958b275c4 100644 --- a/src/resource-rental/resource-rental.ts +++ b/src/resource-rental/resource-rental.ts @@ -1,7 +1,7 @@ import { Agreement, MarketModule } from "../market"; import { AgreementPaymentProcess, PaymentProcessOptions } from "../payment/agreement_payment_process"; import { createAbortSignalFromTimeout, Logger } from "../shared/utils"; -import { waitForCondition } from "../shared/utils/wait"; +import { waitFor } from "../shared/utils/wait"; import { Activity, ActivityModule, ExeUnit, ExeUnitOptions } from "../activity"; import { StorageProvider } from "../shared/storage"; import { EventEmitter } from "eventemitter3"; @@ -77,7 +77,7 @@ export class ResourceRental { this.logger.info("Waiting for payment process of agreement to finish", { agreementId: this.agreement.id }); const abortSignal = createAbortSignalFromTimeout(signalOrTimeout); - await waitForCondition(() => this.paymentProcess.isFinished(), { + await waitFor(() => this.paymentProcess.isFinished(), { signalOrTimeout: abortSignal, }).catch((error) => { this.paymentProcess.stop(); diff --git a/src/shared/utils/index.ts b/src/shared/utils/index.ts index 63471f224..e50539ca7 100644 --- a/src/shared/utils/index.ts +++ b/src/shared/utils/index.ts @@ -10,3 +10,4 @@ export { YagnaApi, YagnaOptions } from "../yagna/yagnaApi"; export * from "./abortSignal"; export * from "./eventLoop"; export * from "./rxjs"; +export * from "./wait"; diff --git a/src/shared/utils/wait.ts b/src/shared/utils/wait.ts index 2fe8bb636..802f13e8e 100644 --- a/src/shared/utils/wait.ts +++ b/src/shared/utils/wait.ts @@ -11,7 +11,7 @@ import { createAbortSignalFromTimeout } from "./abortSignal"; * * @return {Promise} - Resolves when the condition is met or rejects with a timeout error if it wasn't met on time. */ -export function waitForCondition( +export function waitFor( check: () => boolean | Promise, opts?: { signalOrTimeout?: number | AbortSignal; intervalSeconds?: number }, ): Promise { diff --git a/src/shared/yagna/event-reader.ts b/src/shared/yagna/event-reader.ts index 9322b9d7b..48de021e7 100644 --- a/src/shared/yagna/event-reader.ts +++ b/src/shared/yagna/event-reader.ts @@ -1,7 +1,7 @@ import { Logger } from "../utils"; import { Subject } from "rxjs"; import { EventDTO } from "ya-ts-client/dist/market-api"; -import { waitForCondition } from "../utils/wait"; +import { waitFor } from "../utils/wait"; export type CancellablePoll = { /** User defined name of the event stream for ease of debugging */ @@ -79,7 +79,7 @@ export class EventReader { if (currentPoll) { currentPoll.cancel(); } - await waitForCondition(() => isFinished, { intervalSeconds: 0 }); + await waitFor(() => isFinished, { intervalSeconds: 0 }); logger.debug("Cancelled reading the events", { eventType }); }, }; From 1ebbbcb177650ad9882286d25a869eb44be9ba8a Mon Sep 17 00:00:00 2001 From: Grzegorz Godlewski Date: Thu, 1 Aug 2024 09:23:36 +0200 Subject: [PATCH 10/14] chore: self review fix, rename of the tcp proxy file --- src/activity/exe-unit/exe-unit.ts | 2 +- src/activity/exe-unit/index.ts | 2 +- src/index.ts | 2 +- src/network/{tcpProxy.ts => tcp-proxy.ts} | 14 +++++++------- 4 files changed, 10 insertions(+), 10 deletions(-) rename src/network/{tcpProxy.ts => tcp-proxy.ts} (93%) diff --git a/src/activity/exe-unit/exe-unit.ts b/src/activity/exe-unit/exe-unit.ts index 473488d78..247456ae6 100644 --- a/src/activity/exe-unit/exe-unit.ts +++ b/src/activity/exe-unit/exe-unit.ts @@ -20,7 +20,7 @@ import { RemoteProcess } from "./process"; import { GolemWorkError, WorkErrorCode } from "./error"; import { GolemAbortError, GolemConfigError, GolemTimeoutError } from "../../shared/error/golem-error"; import { Agreement, ProviderInfo } from "../../market"; -import { TcpProxy } from "../../network/tcpProxy"; +import { TcpProxy } from "../../network/tcp-proxy"; import { ExecutionOptions, ExeScriptExecutor } from "../exe-script-executor"; import { lastValueFrom, tap, toArray } from "rxjs"; diff --git a/src/activity/exe-unit/index.ts b/src/activity/exe-unit/index.ts index 41162c872..e1ace21ac 100644 --- a/src/activity/exe-unit/index.ts +++ b/src/activity/exe-unit/index.ts @@ -1,4 +1,4 @@ export { ExeUnit, LifecycleFunction, ExeUnitOptions } from "./exe-unit"; export { Batch } from "./batch"; export { GolemWorkError, WorkErrorCode } from "./error"; -export { TcpProxy } from "../../network/tcpProxy"; +export { TcpProxy } from "../../network/tcp-proxy"; diff --git a/src/index.ts b/src/index.ts index e4b53295b..a3497322a 100755 --- a/src/index.ts +++ b/src/index.ts @@ -10,7 +10,7 @@ export * from "./activity"; // Necessary domain entities for users to consume export * from "./shared/error/golem-error"; -export * from "./network/tcpProxy"; +export * from "./network/tcp-proxy"; // Internals export * from "./shared/utils"; diff --git a/src/network/tcpProxy.ts b/src/network/tcp-proxy.ts similarity index 93% rename from src/network/tcpProxy.ts rename to src/network/tcp-proxy.ts index d08a0b210..72d18179c 100644 --- a/src/network/tcpProxy.ts +++ b/src/network/tcp-proxy.ts @@ -35,13 +35,13 @@ export interface TcpProxyOptions { * * General solution description: * - * - [x] Open a TCP server and listen to connections - * - [x] When a new connection arrives, establish a WS connection with yagna - * - [ ] Pass any incoming data from the client TCP socket to the WS, buffer it when the socket is not ready yet - * - [ ] Pass any returning data from the WS to the client TCP socket, but don't do it if the client socket already disconnected - * - [ ] When the WS will be closed, then close the client socket as well - * - [ ] When the client TCP socket will be closed, close the WS as well - * - [ ] Handle teardown of the TCP-WS bridge by clearing communication buffers to avoid memory leaks + * - Open a TCP server and listen to connections + * - When a new connection arrives, establish a WS connection with yagna + * - Pass any incoming data from the client TCP socket to the WS, buffer it when the socket is not ready yet + * - Pass any returning data from the WS to the client TCP socket, but don't do it if the client socket already disconnected + * - When the WS will be closed, then close the client socket as well + * - When the client TCP socket will be closed, close the WS as well + * - Handle teardown of the TCP-WS bridge by clearing communication buffers to avoid memory leaks */ export class TcpProxy { private server: net.Server; From 4c3a61c6ba3d596ffd6319113caecd251bce09f1 Mon Sep 17 00:00:00 2001 From: Grzegorz Godlewski Date: Thu, 1 Aug 2024 10:16:40 +0200 Subject: [PATCH 11/14] chore: applied review remarks --- src/market/scan/scanned-offer.ts | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/market/scan/scanned-offer.ts b/src/market/scan/scanned-offer.ts index 4388d889a..1f1a22726 100644 --- a/src/market/scan/scanned-offer.ts +++ b/src/market/scan/scanned-offer.ts @@ -145,10 +145,9 @@ export class ScannedOffer { * ``` */ get paymentPlatformAddresses(): PaymentPlatformAddressSet { - const propRegex = /golem.com.payment.platform.([a-z0-9-]+).address/; - const platformProps = Object.entries(this.model.properties).filter(([key]) => key.match(propRegex)); + const propRegex = /golem\.com\.payment\.platform\.([a-z0-9-]+)\.address/; - const platformAddress = platformProps + const platformAddress = Object.entries(this.model.properties) .map<[string, string]>(([key, address]) => { const match = key.match(propRegex); return [match ? match[1] : "", address]; From e8c453e2ed352f279e27da72e0be492900302705 Mon Sep 17 00:00:00 2001 From: Grzegorz Godlewski Date: Thu, 1 Aug 2024 22:01:45 +0200 Subject: [PATCH 12/14] chore: fixed example file paths in package.json --- examples/package.json | 31 +++++++++---------- .../{serveLocalGvmi.ts => local-image.ts} | 0 2 files changed, 15 insertions(+), 16 deletions(-) rename examples/rental-model/advanced/local-image/{serveLocalGvmi.ts => local-image.ts} (100%) diff --git a/examples/package.json b/examples/package.json index ceff24c42..d94076c14 100644 --- a/examples/package.json +++ b/examples/package.json @@ -5,23 +5,22 @@ "type": "module", "repository": "https://github.com/golemfactory/golem-js", "scripts": { - "basic-one-of": "tsx basic/one-of.ts", - "basic-many-of": "tsx basic/many-of.ts", - "basic-vpn": "tsx basic/vpn.ts", - "basic-transfer": "tsx basic/transfer.ts", - "basic-events": "tsx basic/events.ts", - "basic-run-and-stream": "tsx basic/run-and-stream.ts", - "advanced-hello-world": "tsx advanced/hello-world.ts", - "advanced-manual-pools": "tsx advanced/manual-pools.ts", - "advanced-step-by-step": "tsx advanced/step-by-step.ts", - "advanced-payment-filters": "tsx advanced/payment-filters.ts", - "advanced-proposal-filters": "tsx advanced/proposal-filter.ts", - "advanced-proposal-predefined-filter": "tsx advanced/proposal-predefined-filter.ts", - "advanced-scan": "tsx advanced/scan.ts", - "advanced-setup-and-teardown": "tsx advanced/setup-and-teardown.ts", - "local-image": "tsx advanced/local-image/serveLocalGvmi.ts", + "basic-one-of": "tsx rental-model/basic/one-of.ts", + "basic-many-of": "tsx rental-model/basic/many-of.ts", + "basic-vpn": "tsx rental-model/basic/vpn.ts", + "basic-transfer": "tsx rental-model/basic/transfer.ts", + "basic-events": "tsx rental-model/basic/events.ts", + "basic-run-and-stream": "tsx rental-model/basic/run-and-stream.ts", + "advanced-manual-pools": "tsx core-api/manual-pools.ts", + "advanced-step-by-step": "tsx core-api/step-by-step.ts", + "advanced-payment-filters": "tsx rental-model/advanced/payment-filters.ts", + "advanced-proposal-filters": "tsx rental-model/advanced/proposal-filter.ts", + "advanced-proposal-predefined-filter": "tsx rental-model/advanced/proposal-predefined-filter.ts", + "advanced-scan": "tsx core-api/scan.ts", + "advanced-setup-and-teardown": "tsx rental-model/advanced/setup-and-teardown.ts", + "tcp-proxy": "tsx rental-model/advanced/tcp-proxy/tcp-proxy.ts", + "local-image": "tsx rental-model/advanced/local-image/local-image.ts", "deployment": "tsx experimental/deployment/new-api.ts", - "market-scan": "tsx market/scan.ts", "preweb": "cp -r ../dist/ web/dist/", "postweb": "rm -rf web/dist/", "web": "serve web/", diff --git a/examples/rental-model/advanced/local-image/serveLocalGvmi.ts b/examples/rental-model/advanced/local-image/local-image.ts similarity index 100% rename from examples/rental-model/advanced/local-image/serveLocalGvmi.ts rename to examples/rental-model/advanced/local-image/local-image.ts From d5d16e23f8dd7658b06b2600d3460417d06a9060 Mon Sep 17 00:00:00 2001 From: Grzegorz Godlewski Date: Fri, 2 Aug 2024 12:18:46 +0200 Subject: [PATCH 13/14] fix(exe-unit): fixed RemoteProcess.isFinished termination detection, no assumed timeout in waitFor RemoteProcess.isFinished is now completing correctly when the user breaks runAndStream using an abort controller . In addition, waitFor is no longer assuming a timeout of 30s for the user. --- .../advanced/tcp-proxy/tcp-proxy.ts | 17 +++++++--- src/activity/exe-script-executor.ts | 2 ++ src/activity/exe-unit/exe-unit.ts | 1 + src/activity/exe-unit/process.ts | 10 +++--- src/market/market.module.test.ts | 3 +- src/resource-rental/resource-rental.ts | 2 +- src/shared/utils/wait.ts | 31 ++++++------------- 7 files changed, 34 insertions(+), 32 deletions(-) diff --git a/examples/rental-model/advanced/tcp-proxy/tcp-proxy.ts b/examples/rental-model/advanced/tcp-proxy/tcp-proxy.ts index da81e3a8c..28a2377e6 100644 --- a/examples/rental-model/advanced/tcp-proxy/tcp-proxy.ts +++ b/examples/rental-model/advanced/tcp-proxy/tcp-proxy.ts @@ -5,10 +5,13 @@ import { pinoPrettyLogger } from "@golem-sdk/pino-logger"; const logger = pinoPrettyLogger({ level: "info", }); + const glm = new GolemNetwork({ logger, }); + const abortController = new AbortController(); + try { await glm.connect(); @@ -33,18 +36,21 @@ import { pinoPrettyLogger } from "@golem-sdk/pino-logger"; }, network, }, + signalOrTimeout: abortController.signal, }); const PORT_ON_PROVIDER = 80; const PORT_ON_REQUESTOR = 8080; - const exe = await rental.getExeUnit(); + const exe = await rental.getExeUnit(abortController.signal); // Install the server script await exe.uploadFile(`./rental-model/advanced/tcp-proxy/server.js`, "/golem/work/server.js"); // Start the server process on the provider - const server = await exe.runAndStream(`PORT=${PORT_ON_PROVIDER} node /golem/work/server.js`); + const server = await exe.runAndStream(`PORT=${PORT_ON_PROVIDER} node /golem/work/server.js`, { + signalOrTimeout: abortController.signal, + }); server.stdout.subscribe((data) => console.log("provider>", data)); server.stderr.subscribe((data) => console.error("provider>", data)); @@ -60,24 +66,27 @@ import { pinoPrettyLogger } from "@golem-sdk/pino-logger"; let isClosing = false; const stopServer = async () => { if (isClosing) { - console.log("Already closing, ignoring subsequent shutdown request"); + console.log("Already closing, ignoring subsequent shutdown request. Process PID: %d", process.pid); return; } + abortController.abort("SIGINT called"); + isClosing = true; console.log("Shutting down gracefully"); await proxy.close(); + logger.info("Shutdown routine completed"); }; process.on("SIGINT", () => { stopServer() .then(() => rental.stopAndFinalize()) - .then(() => logger.info("Shutdown routine completed")) .catch((err) => logger.error("Failed to shutdown cleanly", err)); }); await waitFor(() => server.isFinished()); + console.log("Server process finished"); } catch (error) { logger.error("Failed to run the example", error); } finally { diff --git a/src/activity/exe-script-executor.ts b/src/activity/exe-script-executor.ts index de768b190..f7db15b26 100644 --- a/src/activity/exe-script-executor.ts +++ b/src/activity/exe-script-executor.ts @@ -77,6 +77,7 @@ export class ExeScriptExecutor { if (this.abortSignal.aborted) { throw new GolemAbortError("Executions of script has been aborted", this.abortSignal.reason); } + throw new GolemWorkError( `Unable to execute script. ${message}`, WorkErrorCode.ScriptExecutionFailed, @@ -114,6 +115,7 @@ export class ExeScriptExecutor { if (signal.aborted) { subscriber.error(getError()); } + signal.addEventListener("abort", () => { subscriber.error(getError()); }); diff --git a/src/activity/exe-unit/exe-unit.ts b/src/activity/exe-unit/exe-unit.ts index 247456ae6..f32d68ca7 100644 --- a/src/activity/exe-unit/exe-unit.ts +++ b/src/activity/exe-unit/exe-unit.ts @@ -252,6 +252,7 @@ export class ExeUnit { // In this case, the script consists only of one run command, // so we skip the execution of script.before and script.after const executionMetadata = await this.executor.execute(script.getExeScriptRequest()); + const activityResult$ = this.executor.getResultsObservable( executionMetadata, true, diff --git a/src/activity/exe-unit/process.ts b/src/activity/exe-unit/process.ts index a7ef22b3d..913c12f7e 100644 --- a/src/activity/exe-unit/process.ts +++ b/src/activity/exe-unit/process.ts @@ -2,14 +2,14 @@ import { Activity, ActivityModule, Result } from "../index"; import { GolemWorkError, WorkErrorCode } from "./error"; import { GolemTimeoutError } from "../../shared/error/golem-error"; import { Logger } from "../../shared/utils"; -import { Observable, Subject, Subscription, finalize } from "rxjs"; +import { finalize, Observable, Subject, Subscription } from "rxjs"; const DEFAULTS = { exitWaitingTimeout: 20_000, }; /** - * RemoteProcess class representing the process spawned on the provider by {@link activity/exe-unit/exeunit.ExeUnit.runAndStream} + * RemoteProcess class representing the process spawned on the provider by {@link ExeUnit.runAndStream} */ export class RemoteProcess { /** @@ -46,7 +46,9 @@ export class RemoteProcess { if (result.stdout) this.stdout.next(result.stdout); if (result.stderr) this.stderr.next(result.stderr); }, - error: (error) => (this.streamError = error), + error: (error) => { + this.streamError = error; + }, }); } @@ -100,6 +102,6 @@ export class RemoteProcess { * Checks if the exe-script batch from Yagna has completed, reflecting all work and streaming to be completed */ isFinished() { - return this.lastResult?.isBatchFinished ?? false; + return this.subscription.closed; } } diff --git a/src/market/market.module.test.ts b/src/market/market.module.test.ts index 33d24f47a..da51542ca 100644 --- a/src/market/market.module.test.ts +++ b/src/market/market.module.test.ts @@ -1,5 +1,5 @@ import { _, imock, instance, mock, reset, spy, verify, when } from "@johanblumenberg/ts-mockito"; -import { Logger, YagnaApi } from "../shared/utils"; +import { Logger, waitAndCall, waitFor, YagnaApi } from "../shared/utils"; import { MarketModuleImpl } from "./market.module"; import { Demand, DemandSpecification } from "./demand"; import { Subject, take } from "rxjs"; @@ -12,7 +12,6 @@ import { Allocation, IPaymentApi } from "../payment"; import { INetworkApi, NetworkModule } from "../network"; import { DraftOfferProposalPool } from "./draft-offer-proposal-pool"; import { Agreement, AgreementEvent, ProviderInfo } from "./agreement"; -import { waitAndCall, waitFor } from "../shared/utils/wait"; import { MarketOrderSpec } from "../golem-network"; import { GolemAbortError } from "../shared/error/golem-error"; diff --git a/src/resource-rental/resource-rental.ts b/src/resource-rental/resource-rental.ts index 958b275c4..7b837b903 100644 --- a/src/resource-rental/resource-rental.ts +++ b/src/resource-rental/resource-rental.ts @@ -78,7 +78,7 @@ export class ResourceRental { this.logger.info("Waiting for payment process of agreement to finish", { agreementId: this.agreement.id }); const abortSignal = createAbortSignalFromTimeout(signalOrTimeout); await waitFor(() => this.paymentProcess.isFinished(), { - signalOrTimeout: abortSignal, + abortSignal: abortSignal, }).catch((error) => { this.paymentProcess.stop(); if (error instanceof GolemTimeoutError) { diff --git a/src/shared/utils/wait.ts b/src/shared/utils/wait.ts index 802f13e8e..37bc47085 100644 --- a/src/shared/utils/wait.ts +++ b/src/shared/utils/wait.ts @@ -1,47 +1,36 @@ -import { GolemAbortError, GolemTimeoutError } from "../error/golem-error"; -import { createAbortSignalFromTimeout } from "./abortSignal"; +import { GolemAbortError } from "../error/golem-error"; /** * Utility function that helps to block the execution until a condition is met (check returns true) or the timeout happens. * * @param {function} check - The function checking if the condition is met. * @param {Object} [opts] - Options controlling the timeout and check interval in seconds. - * @param {number} [opts.signalOrTimeout=30_000] - The timeout value in miliseconds or AbortSignal. + * @param {AbortSignal} [opts.abortSignal] - AbortSignal to respect when waiting for the condition to be met * @param {number} [opts.intervalSeconds=1] - The interval between condition checks in seconds. * * @return {Promise} - Resolves when the condition is met or rejects with a timeout error if it wasn't met on time. */ export function waitFor( check: () => boolean | Promise, - opts?: { signalOrTimeout?: number | AbortSignal; intervalSeconds?: number }, + opts?: { abortSignal?: AbortSignal; intervalSeconds?: number }, ): Promise { - const abortSignal = createAbortSignalFromTimeout(opts?.signalOrTimeout ?? 30_000); const intervalSeconds = opts?.intervalSeconds ?? 1; + let verifyInterval: NodeJS.Timeout | undefined; - const verify = new Promise((resolve) => { + const verify = new Promise((resolve, reject) => { verifyInterval = setInterval(async () => { + if (opts?.abortSignal?.aborted) { + reject(new GolemAbortError("Waiting for a condition has been aborted", opts.abortSignal.reason)); + } + if (await check()) { resolve(); } }, intervalSeconds * 1000); }); - const wait = new Promise((_, reject) => { - const abortError = new GolemAbortError("Waiting for a condition has been aborted", abortSignal.reason); - if (abortSignal.aborted) { - return reject(abortError); - } - abortSignal.addEventListener("abort", () => - reject( - abortSignal.reason.name === "TimeoutError" - ? new GolemTimeoutError(`Waiting for a condition has been aborted due to a timeout`, abortSignal.reason) - : abortError, - ), - ); - }); - - return Promise.race([verify, wait]).finally(() => { + return verify.finally(() => { clearInterval(verifyInterval); }); } From cc78c2fd264d5f795c0706c550ace9d7eb24d178 Mon Sep 17 00:00:00 2001 From: Grzegorz Godlewski Date: Fri, 2 Aug 2024 13:03:24 +0200 Subject: [PATCH 14/14] build: fix broken e2e test assertion --- tests/e2e/resourceRentalPool.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/e2e/resourceRentalPool.spec.ts b/tests/e2e/resourceRentalPool.spec.ts index a07a971c0..8bf43e112 100644 --- a/tests/e2e/resourceRentalPool.spec.ts +++ b/tests/e2e/resourceRentalPool.spec.ts @@ -237,7 +237,7 @@ describe("ResourceRentalPool", () => { const rental = await pool.acquire(); await rental.getExeUnit(); await expect(rental.stopAndFinalize(10)).rejects.toThrow( - new GolemAbortError("The finalization of payment process has been aborted due to a timeout"), + new GolemAbortError("The finalization of payment process has been aborted"), ); });