Skip to content

Commit

Permalink
Merge branch 'beta' into feature/JST-516
Browse files Browse the repository at this point in the history
  • Loading branch information
pgrzy-golem authored Oct 19, 2023
2 parents 28f941d + 17d90fa commit 0b11ad8
Show file tree
Hide file tree
Showing 15 changed files with 274 additions and 134 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/examples-nightly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ jobs:

- name: Fund the requestor
# Use a funding script which will retry funding the requestor 3 times, else it exits with error. The faucet is not reliable and sometimes fails to fund the requestor, thus the retry.
run: sleep 4 && docker exec -t docker-requestor-1 /bin/sh -c "/golem-js/tests/docker/fundRequestor.sh"
run: sleep 10 && docker exec -t docker-requestor-1 /bin/sh -c "/golem-js/tests/docker/fundRequestor.sh"

- name: Install and build the SDK in the docker container
run: |
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/goth-nightly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ jobs:

- name: Fund the requestor
# Use a funding script which will retry funding the requestor 3 times, else it exits with error. The faucet is not reliable and sometimes fails to fund the requestor, thus the retry.
run: sleep 4 && docker exec -t docker-requestor-1 /bin/sh -c "/golem-js/tests/docker/fundRequestor.sh"
run: sleep 10 && docker exec -t docker-requestor-1 /bin/sh -c "/golem-js/tests/docker/fundRequestor.sh"

- name: Install and build the SDK in the docker container
run: |
docker exec -t docker-requestor-1 /bin/sh -c "cd /golem-js && npm i && npm run build"
- name: Start the e2e test
run: docker exec -t docker-requestor-1 /bin/sh -c "cd /golem-js && npm i && npm run test:e2e"
run: docker exec -t docker-requestor-1 /bin/sh -c "cd /golem-js && npm i && npm run test:e2e -- --reporters github-actions --reporters summary"

- name: Cleanup Docker
if: always()
Expand Down
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,21 +132,21 @@ First, build the Docker containers using the `docker-compose.yml` file located u

Execute this command to build the Docker containers:

docker compose -f tests/docker/docker-compose.yml build
docker-compose -f tests/docker/docker-compose.yml build

##### Start Docker Containers

Then, launch the Docker containers you've just built using the same `docker-compose.yml` file.

Execute this command to start the Docker containers:

docker compose -f tests/docker/docker-compose.yml down && docker compose -f tests/docker/docker-compose.yml up -d
docker-compose -f tests/docker/docker-compose.yml down && docker-compose -f tests/docker/docker-compose.yml up -d

##### Fund the Requestor

The next step is to fund the requestor.

docker exec -t docker-requestor-1 /bin/sh -c "/golem-js/tests/docker/fundRequestor.sh"
docker exec -t docker_requestor_1 /bin/sh -c "/golem-js/tests/docker/fundRequestor.sh"

##### Install and Build the SDK

Expand All @@ -155,29 +155,29 @@ Finally, install and build the golem-js SDK in the Docker container
Run this chain of commands to install and build the SDK and prepare cypress.

```docker
docker exec -t docker-requestor-1 /bin/sh -c "cd /golem-js && npm i && npm run build && ./node_modules/.bin/cypress install"
docker exec -t docker_requestor_1 /bin/sh -c "cd /golem-js && npm i && npm run build && ./node_modules/.bin/cypress install"
```

#### Execute the E2E Tests

With your test environment set up, you can now initiate the E2E tests. Run the following command to start:

```docker
docker exec -t docker-requestor-1 /bin/sh -c "cd /golem-js && npm run test:e2e"
docker exec -t docker_requestor_1 /bin/sh -c "cd /golem-js && npm run test:e2e"
```

#### Execute the cypress Tests

First make sure that the webserver that's used for testing is running, by running the command

```docker
docker exec -t -d docker-requestor-1 /bin/sh -c "cd /golem-js/examples/web && node app.mjs"
docker exec -t -d docker_requestor_1 /bin/sh -c "cd /golem-js/examples/web && node app.mjs"
```

Now you're ready to start the cypress tests by running the command

```docker
docker exec -t docker-requestor-1 /bin/sh -c "cd /golem-js && npm run test:cypress -- --browser chromium"
docker exec -t docker_requestor_1 /bin/sh -c "cd /golem-js && npm run test:cypress -- --browser chromium"
```

### Contributing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const myFilter = async (proposal) => {
package: "9a3b5d67b0b27746283cb5f287c13eab1beaa12d92a9f536b747c7ae",
proposalFilter: myFilter,
yagnaOptions: { apiKey: "try_golem" },
startupTimeout: 60_000,
});
await executor.run(async (ctx) =>
console.log((await ctx.run(`echo "This task is run on ${ctx.provider.id}"`)).stdout, ctx.provider.id),
Expand Down
3 changes: 3 additions & 0 deletions src/executor/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const DEFAULTS = Object.freeze({
taskTimeout: 1000 * 60 * 5, // 5 min,
maxTaskRetries: 3,
enableLogging: true,
startupTimeout: 1000 * 30, // 30 sec
});

/**
Expand All @@ -34,6 +35,7 @@ export class ExecutorConfig {
readonly maxTaskRetries: number;
readonly activityExecuteTimeout?: number;
readonly jobStorage: JobStorage;
readonly startupTimeout: number;

constructor(options: ExecutorOptions & ActivityOptions) {
const processEnv = !runtimeContextChecker.isBrowser
Expand Down Expand Up @@ -83,5 +85,6 @@ export class ExecutorConfig {
this.eventTarget = options.eventTarget || new EventTarget();
this.maxTaskRetries = options.maxTaskRetries ?? DEFAULTS.maxTaskRetries;
this.jobStorage = options.jobStorage || new InMemoryJobStorage();
this.startupTimeout = options.startupTimeout ?? DEFAULTS.startupTimeout;
}
}
64 changes: 64 additions & 0 deletions src/executor/executor.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { MarketService } from "../market/";
import { AgreementPoolService } from "../agreement/";
import { TaskService } from "../task/";
import { TaskExecutor } from "./executor";
import { sleep } from "../utils";
import { LoggerMock } from "../../tests/mock";

jest.mock("../market/service");
jest.mock("../agreement/service");
jest.mock("../network/service");
jest.mock("../task/service");
jest.mock("../storage/gftp");
jest.mock("../utils/yagna/yagna");

const serviceRunSpy = jest.fn().mockImplementation(() => Promise.resolve());
jest.spyOn(MarketService.prototype, "run").mockImplementation(serviceRunSpy);
jest.spyOn(AgreementPoolService.prototype, "run").mockImplementation(serviceRunSpy);
jest.spyOn(TaskService.prototype, "run").mockImplementation(serviceRunSpy);

jest.mock("../payment/service", () => {
return {
PaymentService: jest.fn().mockImplementation(() => {
return {
config: { payment: { network: "test" } },
createAllocation: jest.fn(),
run: serviceRunSpy,
end: jest.fn(),
};
}),
};
});

describe("Task Executor", () => {
const logger = new LoggerMock();
const yagnaOptions = { apiKey: "test" };
beforeEach(() => {
jest.clearAllMocks();
logger.clear();
});

describe("init()", () => {
it("should run all set services", async () => {
const executor = await TaskExecutor.create({ package: "test", logger, yagnaOptions });
expect(serviceRunSpy).toHaveBeenCalledTimes(4);
expect(executor).toBeDefined();
await executor.end();
});
it("should handle a critical error if startup timeout is reached", async () => {
const executor = await TaskExecutor.create({ package: "test", startupTimeout: 0, logger, yagnaOptions });
jest
.spyOn(MarketService.prototype, "getProposalsCount")
.mockImplementation(() => ({ confirmed: 0, initial: 0, rejected: 0 }));
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const handleErrorSpy = jest.spyOn(executor as any, "handleCriticalError").mockImplementation((error) => {
expect((error as Error).message).toEqual(
"Could not start any work on Golem. Processed 0 initial proposals from yagna, filters accepted 0. Check your demand if it's not too restrictive or restart yagna.",
);
});
await sleep(10, true);
expect(handleErrorSpy).toHaveBeenCalled();
await executor.end();
});
});
});
37 changes: 36 additions & 1 deletion src/executor/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ export type ExecutorOptions = {
* Note: If you decide to set this to `true`, you will be responsible for proper shutdown of task executor.
*/
skipProcessSignals?: boolean;
/*
* Timeout for waiting for at least one offer from the market.
* This parameter (set to 30 sec by default) will throw an error when executing `TaskExecutor.run`
* if no offer from the market is accepted before this time.
* You can set a slightly higher time in a situation where your parameters such as proposalFilter
* or minimum hardware requirements are quite restrictive and finding a suitable provider
* that meets these criteria may take a bit longer.
*/
startupTimeout?: number;
} & Omit<PackageOptions, "imageHash" | "imageTag"> &
MarketOptions &
TaskServiceOptions &
Expand Down Expand Up @@ -100,6 +109,7 @@ export class TaskExecutor {
private isRunning = true;
private configOptions: ExecutorOptions;
private isCanceled = false;
private startupTimeoutId?: NodeJS.Timeout;
private yagna: Yagna;

private signalHandler = (signal: string) => this.cancel(signal);
Expand Down Expand Up @@ -217,7 +227,7 @@ export class TaskExecutor {
this.logger?.debug("Initializing task executor services...");
const allocations = await this.paymentService.createAllocation();
await Promise.all([
this.marketService.run(taskPackage, allocations),
this.marketService.run(taskPackage, allocations).then(() => this.setStartupTimeout()),
this.agreementPoolService.run(),
this.paymentService.run(),
this.networkService?.run(),
Expand All @@ -239,6 +249,7 @@ export class TaskExecutor {
if (runtimeContextChecker.isNode) this.removeSignalHandlers();
if (!this.isRunning) return;
this.isRunning = false;
clearTimeout(this.startupTimeoutId);
if (!this.configOptions.storageProvider) await this.storageProvider?.close();
await this.networkService?.end();
await Promise.all([this.taskService.end(), this.agreementPoolService.end(), this.marketService.end()]);
Expand Down Expand Up @@ -499,4 +510,28 @@ export class TaskExecutor {
if (costsSummary.length) this.logger?.table?.(costsSummary);
this.logger?.info(`Total Cost: ${costs.total} Total Paid: ${costs.paid}`);
}

/**
* Sets a timeout for waiting for offers from the market.
* If at least one offer is not confirmed during the set timeout,
* a critical error will be reported and the entire process will be interrupted.
*/
private setStartupTimeout() {
this.startupTimeoutId = setTimeout(() => {
const proposalsCount = this.marketService.getProposalsCount();
if (proposalsCount.confirmed === 0) {
const hint =
proposalsCount.initial === 0 && proposalsCount.confirmed === 0
? "Check your demand if it's not too restrictive or restart yagna."
: proposalsCount.initial === proposalsCount.rejected
? "All off proposals got rejected."
: "Check your proposal filters if they are not too restrictive.";
this.handleCriticalError(
new Error(
`Could not start any work on Golem. Processed ${proposalsCount.initial} initial proposals from yagna, filters accepted ${proposalsCount.confirmed}. ${hint}`,
),
);
}
}, this.options.startupTimeout);
}
}
24 changes: 21 additions & 3 deletions src/market/service.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import { Logger, sleep } from "../utils";
import { YagnaApi, Logger, sleep } from "../utils";
import { Package } from "../package";
import { Proposal } from "./proposal";
import { AgreementPoolService } from "../agreement";
import { Allocation } from "../payment";
import { Demand, DemandEvent, DemandEventType, DemandOptions } from "./demand";
import { MarketConfig } from "./config";
import { YagnaApi } from "../utils/yagna/yagna";

export type ProposalFilter = (proposal: Proposal) => Promise<boolean> | boolean;

Expand All @@ -28,6 +27,11 @@ export class MarketService {
private logger?: Logger;
private taskPackage?: Package;
private maxResubscribeRetries = 5;
private proposalsCount = {
initial: 0,
confirmed: 0,
rejected: 0,
};

constructor(
private readonly agreementPoolService: AgreementPoolService,
Expand All @@ -53,10 +57,18 @@ export class MarketService {
this.logger?.debug("Market Service has been stopped");
}

getProposalsCount() {
return this.proposalsCount;
}
private async createDemand(): Promise<true> {
if (!this.taskPackage || !this.allocation) throw new Error("The service has not been started correctly.");
this.demand = await Demand.create(this.taskPackage, this.allocation, this.yagnaApi, this.options);
this.demand.addEventListener(DemandEventType, this.demandEventListener.bind(this));
this.proposalsCount = {
initial: 0,
confirmed: 0,
rejected: 0,
};
this.logger?.debug(`New demand has been created (${this.demand.id})`);
return true;
}
Expand All @@ -72,7 +84,10 @@ export class MarketService {
if (proposal.isInitial()) this.processInitialProposal(proposal);
else if (proposal.isDraft()) this.processDraftProposal(proposal);
else if (proposal.isExpired()) this.logger?.debug(`Proposal hes expired ${proposal.id}`);
else if (proposal.isRejected()) this.logger?.debug(`Proposal hes rejected ${proposal.id}`);
else if (proposal.isRejected()) {
this.proposalsCount.rejected++;
this.logger?.debug(`Proposal hes rejected ${proposal.id}`);
}
}

private async resubscribeDemand() {
Expand All @@ -92,6 +107,7 @@ export class MarketService {
private async processInitialProposal(proposal: Proposal) {
if (!this.allocation) throw new Error("The service has not been started correctly.");
this.logger?.debug(`New proposal has been received (${proposal.id})`);
this.proposalsCount.initial++;
try {
const { result: isProposalValid, reason } = await this.isProposalValid(proposal);
if (isProposalValid) {
Expand All @@ -101,6 +117,7 @@ export class MarketService {
.catch((e) => this.logger?.debug(`Unable to respond proposal ${proposal.id}. ${e}`));
this.logger?.debug(`Proposal has been responded (${proposal.id})`);
} else {
this.proposalsCount.rejected++;
this.logger?.debug(`Proposal has been rejected (${proposal.id}). Reason: ${reason}`);
}
} catch (error) {
Expand All @@ -122,6 +139,7 @@ export class MarketService {

private async processDraftProposal(proposal: Proposal) {
await this.agreementPoolService.addProposal(proposal);
this.proposalsCount.confirmed++;
this.logger?.debug(
`Proposal has been confirmed with provider ${proposal.issuerId} and added to agreement pool (${proposal.id})`,
);
Expand Down
2 changes: 2 additions & 0 deletions src/payment/payments.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ export class Payments extends EventTarget {
{ timeout: 0 },
);
for (const event of invoiceEvents) {
if (!this.isRunning) return;
if (event.eventType !== "InvoiceReceivedEvent") continue;
const invoice = await Invoice.create(event["invoiceId"], this.yagnaApi, { ...this.options }).catch(
(e) =>
Expand Down Expand Up @@ -95,6 +96,7 @@ export class Payments extends EventTarget {
)
.catch(() => ({ data: [] }));
for (const event of debitNotesEvents) {
if (!this.isRunning) return;
if (event.eventType !== "DebitNoteReceivedEvent") continue;
const debitNote = await DebitNote.create(event["debitNoteId"], this.yagnaApi, { ...this.options }).catch(
(e) =>
Expand Down
2 changes: 1 addition & 1 deletion src/payment/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ export class PaymentService {
clearTimeout(timeoutId);
}
this.isRunning = false;
this.payments?.unsubscribe().catch((error) => this.logger?.warn(error));
await this.payments?.unsubscribe().catch((error) => this.logger?.warn(error));
this.payments?.removeEventListener(PaymentEventType, this.subscribePayments.bind(this));
await this.allocation?.release().catch((error) => this.logger?.warn(error));
this.logger?.info("Allocation has been released");
Expand Down
Loading

0 comments on commit 0b11ad8

Please sign in to comment.