diff --git a/examples/basic/run-and-stream.ts b/examples/basic/run-and-stream.ts new file mode 100644 index 000000000..4ff3af019 --- /dev/null +++ b/examples/basic/run-and-stream.ts @@ -0,0 +1,60 @@ +import { MarketOrderSpec, GolemNetwork } from "@golem-sdk/golem-js"; +import { pinoPrettyLogger } from "@golem-sdk/pino-logger"; + +/** + * Example demonstrating the execution of a command on a provider which may take a long time + * and returns the results from the execution during the command as a stream + */ + +const order: MarketOrderSpec = { + demand: { + workload: { imageTag: "golem/alpine:latest" }, + }, + market: { + maxAgreements: 1, + rentHours: 0.5, + pricing: { + model: "linear", + maxStartPrice: 0.5, + maxCpuPerHourPrice: 1.0, + maxEnvPerHourPrice: 0.5, + }, + }, +}; + +(async () => { + const glm = new GolemNetwork({ + logger: pinoPrettyLogger({ + level: "info", + }), + }); + + try { + await glm.connect(); + const lease = await glm.oneOf(order); + const exe = await lease.getExeUnit(); + + const remoteProcess = await exe.runAndStream( + ` + sleep 1 + echo -n 'Hello from stdout' >&1 + echo -n 'Hello from stderr' >&2 + sleep 1 + echo -n 'Hello from stdout again' >&1 + echo -n 'Hello from stderr again' >&2 + sleep 1 + echo -n 'Hello from stdout yet again' >&1 + echo -n 'Hello from stderr yet again' >&2 + `, + ); + remoteProcess.stdout.on("data", (data) => console.log("stdout>", data)); + remoteProcess.stderr.on("data", (data) => console.error("stderr>", data)); + await remoteProcess.waitForExit(); + + await lease.finalize(); + } catch (err) { + console.error("Failed to run the example", err); + } finally { + await glm.disconnect(); + } +})().catch(console.error); diff --git a/examples/package.json b/examples/package.json index cbf7252bf..80ba69359 100644 --- a/examples/package.json +++ b/examples/package.json @@ -9,6 +9,7 @@ "basic-many-of": "tsx basic/many-of.ts", "basic-vpn": "tsx basic/vpn.ts", "basic-transfer": "tsx basic/transfer.ts", + "basic-run-and-stream": "tsx basic/run-and-stream.ts", "advanced-hello-world": "tsx advanced/hello-world.ts", "advanced-manual-pools": "tsx advanced/manual-pools.ts", "advanced-payment-filters": "tsx advanced/payment-filters.ts", diff --git a/src/shared/yagna/yagnaApi.ts b/src/shared/yagna/yagnaApi.ts index 90d6f19de..c391bdf05 100644 --- a/src/shared/yagna/yagnaApi.ts +++ b/src/shared/yagna/yagnaApi.ts @@ -117,22 +117,25 @@ export class YagnaApi { BASE: `${this.basePath}/activity-api/v1`, HEADERS: commonHeaders, }); - this.activity = { control: activityApiClient.requestorControl, state: activityApiClient.requestorState, exec: { observeBatchExecResults: (activityId: string, batchId: string) => { return new Observable((observer) => { - const eventSource = new EventSource(`${this.basePath}/activity/${activityId}/exec/${batchId}`, { - headers: { - Accept: "text/event-stream", - Authorization: `Bearer ${apiKey}`, + const eventSource = new EventSource( + `${this.basePath}/activity-api/v1/activity/${activityId}/exec/${batchId}`, + { + headers: { + Accept: "text/event-stream", + Authorization: `Bearer ${apiKey}`, + }, }, - }); + ); eventSource.addEventListener("runtime", (event) => observer.next(JSON.parse(event.data))); eventSource.addEventListener("error", (error) => observer.error(error)); + return () => eventSource.close(); }); }, }, diff --git a/tests/examples/examples.json b/tests/examples/examples.json index 5bf5bee99..0c9ec8958 100644 --- a/tests/examples/examples.json +++ b/tests/examples/examples.json @@ -3,6 +3,7 @@ { "cmd": "tsx", "path": "examples/basic/one-of.ts" }, { "cmd": "tsx", "path": "examples/basic/vpn.ts" }, { "cmd": "tsx", "path": "examples/basic/transfer.ts" }, + { "cmd": "tsx", "path": "examples/basic/run-and-stream.ts" }, { "cmd": "tsx", "path": "examples/advanced/hello-world.ts" }, { "cmd": "tsx", "path": "examples/advanced/manual-pools.ts" }, { "cmd": "tsx", "path": "examples/advanced/payment-filters.ts" },