Skip to content

Commit

Permalink
Merge pull request #1041 from golemfactory/bugfix/tcp-proxy-patch
Browse files Browse the repository at this point in the history
Fix TcpProxy for non HTTP communication
  • Loading branch information
mgordel authored Aug 2, 2024
2 parents 1fd6d6e + d5d16e2 commit 28270ab
Show file tree
Hide file tree
Showing 16 changed files with 486 additions and 248 deletions.
31 changes: 15 additions & 16 deletions examples/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,22 @@
"type": "module",
"repository": "https://github.com/golemfactory/golem-js",
"scripts": {
"basic-one-of": "tsx basic/one-of.ts",
"basic-many-of": "tsx basic/many-of.ts",
"basic-vpn": "tsx basic/vpn.ts",
"basic-transfer": "tsx basic/transfer.ts",
"basic-events": "tsx basic/events.ts",
"basic-run-and-stream": "tsx basic/run-and-stream.ts",
"advanced-hello-world": "tsx advanced/hello-world.ts",
"advanced-manual-pools": "tsx advanced/manual-pools.ts",
"advanced-step-by-step": "tsx advanced/step-by-step.ts",
"advanced-payment-filters": "tsx advanced/payment-filters.ts",
"advanced-proposal-filters": "tsx advanced/proposal-filter.ts",
"advanced-proposal-predefined-filter": "tsx advanced/proposal-predefined-filter.ts",
"advanced-scan": "tsx advanced/scan.ts",
"advanced-setup-and-teardown": "tsx advanced/setup-and-teardown.ts",
"local-image": "tsx advanced/local-image/serveLocalGvmi.ts",
"basic-one-of": "tsx rental-model/basic/one-of.ts",
"basic-many-of": "tsx rental-model/basic/many-of.ts",
"basic-vpn": "tsx rental-model/basic/vpn.ts",
"basic-transfer": "tsx rental-model/basic/transfer.ts",
"basic-events": "tsx rental-model/basic/events.ts",
"basic-run-and-stream": "tsx rental-model/basic/run-and-stream.ts",
"advanced-manual-pools": "tsx core-api/manual-pools.ts",
"advanced-step-by-step": "tsx core-api/step-by-step.ts",
"advanced-payment-filters": "tsx rental-model/advanced/payment-filters.ts",
"advanced-proposal-filters": "tsx rental-model/advanced/proposal-filter.ts",
"advanced-proposal-predefined-filter": "tsx rental-model/advanced/proposal-predefined-filter.ts",
"advanced-scan": "tsx core-api/scan.ts",
"advanced-setup-and-teardown": "tsx rental-model/advanced/setup-and-teardown.ts",
"tcp-proxy": "tsx rental-model/advanced/tcp-proxy/tcp-proxy.ts",
"local-image": "tsx rental-model/advanced/local-image/local-image.ts",
"deployment": "tsx experimental/deployment/new-api.ts",
"market-scan": "tsx market/scan.ts",
"preweb": "cp -r ../dist/ web/dist/",
"postweb": "rm -rf web/dist/",
"web": "serve web/",
Expand Down
39 changes: 39 additions & 0 deletions examples/rental-model/advanced/tcp-proxy/server.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/* eslint-disable */
const http = require("http");

(async function main() {
const PORT = parseInt(process.env["PORT"] ?? "80");

// Increase the value if you want to test long response/liveliness scenarios
const SIMULATE_DELAY_SEC = parseInt(process.env["SIMULATE_DELAY_SEC"] ?? "0");

const respond = (res) => {
res.writeHead(200);
res.end("Hello Golem!");
};

const app = http.createServer((req, res) => {
if (SIMULATE_DELAY_SEC > 0) {
setTimeout(() => {
respond(res);
}, SIMULATE_DELAY_SEC * 1000);
} else {
respond(res);
}
});

const server = app.listen(PORT, () => console.log(`HTTP server started at "http://localhost:${PORT}"`));

const shutdown = () => {
server.close((err) => {
if (err) {
console.error("Server close encountered an issue", err);
} else {
console.log("Server closed successfully");
}
});
};

process.on("SIGINT", shutdown);
process.on("SIGTERM", shutdown);
})();
95 changes: 95 additions & 0 deletions examples/rental-model/advanced/tcp-proxy/tcp-proxy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import { GolemNetwork, waitFor } from "@golem-sdk/golem-js";
import { pinoPrettyLogger } from "@golem-sdk/pino-logger";

(async () => {
const logger = pinoPrettyLogger({
level: "info",
});

const glm = new GolemNetwork({
logger,
});

const abortController = new AbortController();

try {
await glm.connect();

const network = await glm.createNetwork({
ip: "10.0.0.0/24",
});

const rental = await glm.oneOf({
order: {
demand: {
workload: {
imageTag: "golem/node:20-alpine",
capabilities: ["vpn"],
},
},
market: {
rentHours: 0.25,
pricing: {
model: "burn-rate",
avgGlmPerHour: 1,
},
},
network,
},
signalOrTimeout: abortController.signal,
});

const PORT_ON_PROVIDER = 80;
const PORT_ON_REQUESTOR = 8080;

const exe = await rental.getExeUnit(abortController.signal);

// Install the server script
await exe.uploadFile(`./rental-model/advanced/tcp-proxy/server.js`, "/golem/work/server.js");

// Start the server process on the provider
const server = await exe.runAndStream(`PORT=${PORT_ON_PROVIDER} node /golem/work/server.js`, {
signalOrTimeout: abortController.signal,
});

server.stdout.subscribe((data) => console.log("provider>", data));
server.stderr.subscribe((data) => console.error("provider>", data));

// Create a proxy instance
const proxy = exe.createTcpProxy(PORT_ON_PROVIDER);
proxy.events.on("error", (error) => console.error("TcpProxy reported an error:", error));

// Start listening and expose the port on your requestor machine
await proxy.listen(PORT_ON_REQUESTOR);
console.log(`Server Proxy listen at http://localhost:${PORT_ON_REQUESTOR}`);

let isClosing = false;
const stopServer = async () => {
if (isClosing) {
console.log("Already closing, ignoring subsequent shutdown request. Process PID: %d", process.pid);
return;
}

abortController.abort("SIGINT called");

isClosing = true;

console.log("Shutting down gracefully");
await proxy.close();
logger.info("Shutdown routine completed");
};

process.on("SIGINT", () => {
stopServer()
.then(() => rental.stopAndFinalize())
.catch((err) => logger.error("Failed to shutdown cleanly", err));
});

await waitFor(() => server.isFinished());
console.log("Server process finished");
} catch (error) {
logger.error("Failed to run the example", error);
} finally {
await glm.disconnect();
}
})().catch(console.error);
2 changes: 2 additions & 0 deletions src/activity/exe-script-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ export class ExeScriptExecutor {
if (this.abortSignal.aborted) {
throw new GolemAbortError("Executions of script has been aborted", this.abortSignal.reason);
}

throw new GolemWorkError(
`Unable to execute script. ${message}`,
WorkErrorCode.ScriptExecutionFailed,
Expand Down Expand Up @@ -114,6 +115,7 @@ export class ExeScriptExecutor {
if (signal.aborted) {
subscriber.error(getError());
}

signal.addEventListener("abort", () => {
subscriber.error(getError());
});
Expand Down
3 changes: 2 additions & 1 deletion src/activity/exe-unit/exe-unit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import { RemoteProcess } from "./process";
import { GolemWorkError, WorkErrorCode } from "./error";
import { GolemAbortError, GolemConfigError, GolemTimeoutError } from "../../shared/error/golem-error";
import { Agreement, ProviderInfo } from "../../market";
import { TcpProxy } from "../../network/tcpProxy";
import { TcpProxy } from "../../network/tcp-proxy";
import { ExecutionOptions, ExeScriptExecutor } from "../exe-script-executor";
import { lastValueFrom, tap, toArray } from "rxjs";

Expand Down Expand Up @@ -252,6 +252,7 @@ export class ExeUnit {
// In this case, the script consists only of one run command,
// so we skip the execution of script.before and script.after
const executionMetadata = await this.executor.execute(script.getExeScriptRequest());

const activityResult$ = this.executor.getResultsObservable(
executionMetadata,
true,
Expand Down
2 changes: 1 addition & 1 deletion src/activity/exe-unit/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export { ExeUnit, LifecycleFunction, ExeUnitOptions } from "./exe-unit";
export { Batch } from "./batch";
export { GolemWorkError, WorkErrorCode } from "./error";
export { TcpProxy } from "../../network/tcpProxy";
export { TcpProxy } from "../../network/tcp-proxy";
10 changes: 6 additions & 4 deletions src/activity/exe-unit/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ import { Activity, ActivityModule, Result } from "../index";
import { GolemWorkError, WorkErrorCode } from "./error";
import { GolemTimeoutError } from "../../shared/error/golem-error";
import { Logger } from "../../shared/utils";
import { Observable, Subject, Subscription, finalize } from "rxjs";
import { finalize, Observable, Subject, Subscription } from "rxjs";

const DEFAULTS = {
exitWaitingTimeout: 20_000,
};

/**
* RemoteProcess class representing the process spawned on the provider by {@link activity/exe-unit/exeunit.ExeUnit.runAndStream}
* RemoteProcess class representing the process spawned on the provider by {@link ExeUnit.runAndStream}
*/
export class RemoteProcess {
/**
Expand Down Expand Up @@ -46,7 +46,9 @@ export class RemoteProcess {
if (result.stdout) this.stdout.next(result.stdout);
if (result.stderr) this.stderr.next(result.stderr);
},
error: (error) => (this.streamError = error),
error: (error) => {
this.streamError = error;
},
});
}

Expand Down Expand Up @@ -100,6 +102,6 @@ export class RemoteProcess {
* Checks if the exe-script batch from Yagna has completed, reflecting all work and streaming to be completed
*/
isFinished() {
return this.lastResult?.isBatchFinished ?? false;
return this.subscription.closed;
}
}
2 changes: 1 addition & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export * from "./activity";

// Necessary domain entities for users to consume
export * from "./shared/error/golem-error";
export * from "./network/tcpProxy";
export * from "./network/tcp-proxy";

// Internals
export * from "./shared/utils";
Expand Down
5 changes: 2 additions & 3 deletions src/market/market.module.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { _, imock, instance, mock, reset, spy, verify, when } from "@johanblumenberg/ts-mockito";
import { Logger, YagnaApi } from "../shared/utils";
import { Logger, waitAndCall, waitFor, YagnaApi } from "../shared/utils";
import { MarketModuleImpl } from "./market.module";
import { Demand, DemandSpecification } from "./demand";
import { Subject, take } from "rxjs";
Expand All @@ -12,7 +12,6 @@ import { Allocation, IPaymentApi } from "../payment";
import { INetworkApi, NetworkModule } from "../network";
import { DraftOfferProposalPool } from "./draft-offer-proposal-pool";
import { Agreement, AgreementEvent, ProviderInfo } from "./agreement";
import { waitAndCall, waitForCondition } from "../shared/utils/wait";
import { MarketOrderSpec } from "../golem-network";
import { GolemAbortError } from "../shared/error/golem-error";

Expand Down Expand Up @@ -347,7 +346,7 @@ describe("Market module", () => {
});
});

await waitForCondition(() => draftListener.mock.calls.length > 0);
await waitFor(() => draftListener.mock.calls.length > 0);
testSub.unsubscribe();

expect(draftListener).toHaveBeenCalledWith(draftProposal);
Expand Down
Loading

0 comments on commit 28270ab

Please sign in to comment.