Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(executor): added error when no proposals are received #607

Merged
merged 23 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/examples-nightly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ jobs:

- name: Fund the requestor
# Use a funding script which will retry funding the requestor 3 times, else it exits with error. The faucet is not reliable and sometimes fails to fund the requestor, thus the retry.
run: sleep 4 && docker exec -t docker-requestor-1 /bin/sh -c "/golem-js/tests/docker/fundRequestor.sh"
run: sleep 10 && docker exec -t docker-requestor-1 /bin/sh -c "/golem-js/tests/docker/fundRequestor.sh"

- name: Install and build the SDK in the docker container
run: |
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/goth-nightly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ jobs:

- name: Fund the requestor
# Use a funding script which will retry funding the requestor 3 times, else it exits with error. The faucet is not reliable and sometimes fails to fund the requestor, thus the retry.
run: sleep 4 && docker exec -t docker-requestor-1 /bin/sh -c "/golem-js/tests/docker/fundRequestor.sh"
run: sleep 10 && docker exec -t docker-requestor-1 /bin/sh -c "/golem-js/tests/docker/fundRequestor.sh"

- name: Install and build the SDK in the docker container
run: |
docker exec -t docker-requestor-1 /bin/sh -c "cd /golem-js && npm i && npm run build"

- name: Start the e2e test
run: docker exec -t docker-requestor-1 /bin/sh -c "cd /golem-js && npm i && npm run test:e2e"
run: docker exec -t docker-requestor-1 /bin/sh -c "cd /golem-js && npm i && npm run test:e2e -- --reporters github-actions --reporters summary"

- name: Cleanup Docker
if: always()
Expand Down
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,21 +131,21 @@ First, build the Docker containers using the `docker-compose.yml` file located u

Execute this command to build the Docker containers:

docker compose -f tests/docker/docker-compose.yml build
docker-compose -f tests/docker/docker-compose.yml build

##### Start Docker Containers

Then, launch the Docker containers you've just built using the same `docker-compose.yml` file.

Execute this command to start the Docker containers:

docker compose -f tests/docker/docker-compose.yml down && docker compose -f tests/docker/docker-compose.yml up -d
docker-compose -f tests/docker/docker-compose.yml down && docker-compose -f tests/docker/docker-compose.yml up -d

##### Fund the Requestor

The next step is to fund the requestor.

docker exec -t docker-requestor-1 /bin/sh -c "/golem-js/tests/docker/fundRequestor.sh"
docker exec -t docker_requestor_1 /bin/sh -c "/golem-js/tests/docker/fundRequestor.sh"

##### Install and Build the SDK

Expand All @@ -154,29 +154,29 @@ Finally, install and build the golem-js SDK in the Docker container
Run this chain of commands to install and build the SDK and prepare cypress.

```docker
docker exec -t docker-requestor-1 /bin/sh -c "cd /golem-js && npm i && npm run build && ./node_modules/.bin/cypress install"
docker exec -t docker_requestor_1 /bin/sh -c "cd /golem-js && npm i && npm run build && ./node_modules/.bin/cypress install"
```

#### Execute the E2E Tests

With your test environment set up, you can now initiate the E2E tests. Run the following command to start:

```docker
docker exec -t docker-requestor-1 /bin/sh -c "cd /golem-js && npm run test:e2e"
docker exec -t docker_requestor_1 /bin/sh -c "cd /golem-js && npm run test:e2e"
```

#### Execute the cypress Tests

First make sure that the webserver that's used for testing is running, by running the command

```docker
docker exec -t -d docker-requestor-1 /bin/sh -c "cd /golem-js/examples/web && node app.mjs"
docker exec -t -d docker_requestor_1 /bin/sh -c "cd /golem-js/examples/web && node app.mjs"
```

Now you're ready to start the cypress tests by running the command

```docker
docker exec -t docker-requestor-1 /bin/sh -c "cd /golem-js && npm run test:cypress -- --browser chromium"
docker exec -t docker_requestor_1 /bin/sh -c "cd /golem-js && npm run test:cypress -- --browser chromium"
```

### Contributing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const myFilter = async (proposal) => {
package: "9a3b5d67b0b27746283cb5f287c13eab1beaa12d92a9f536b747c7ae",
proposalFilter: myFilter,
yagnaOptions: { apiKey: "try_golem" },
startupTimeout: 60_000,
});
await executor.run(async (ctx) =>
console.log((await ctx.run(`echo "This task is run on ${ctx.provider.id}"`)).stdout, ctx.provider.id),
Expand Down
7 changes: 5 additions & 2 deletions rollup.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export default [
commonjs(),
nodePolyfills(),
json(), // Required because one our dependencies (bottleneck) loads its own 'version.json'
typescript({ tsconfig: "./tsconfig.json" }),
typescript({ tsconfig: "./tsconfig.json", exclude: ["**/__tests__", "**/*.test.ts"] }),
terser({ keep_classnames: true }),
filesize({ reporter: [sizeValidator, "boxen"] }),
],
Expand All @@ -54,7 +54,10 @@ export default [
{ file: pkg.main, format: "cjs", sourcemap: true },
{ file: pkg.module, format: "es", sourcemap: true },
],
plugins: [typescript({ tsconfig: "./tsconfig.json" }), filesize({ reporter: [sizeValidator, "boxen"] })],
plugins: [
typescript({ tsconfig: "./tsconfig.json", exclude: ["**/__tests__", "**/*.test.ts"] }),
filesize({ reporter: [sizeValidator, "boxen"] }),
],
},
];

Expand Down
3 changes: 3 additions & 0 deletions src/executor/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const DEFAULTS = Object.freeze({
taskTimeout: 1000 * 60 * 5, // 5 min,
maxTaskRetries: 3,
enableLogging: true,
startupTimeout: 1000 * 30, // 30 sec
});

/**
Expand All @@ -34,6 +35,7 @@ export class ExecutorConfig {
readonly maxTaskRetries: number;
readonly activityExecuteTimeout?: number;
readonly jobStorage: JobStorage;
readonly startupTimeout: number;

constructor(options: ExecutorOptions & ActivityOptions) {
const processEnv = !runtimeContextChecker.isBrowser
Expand Down Expand Up @@ -83,5 +85,6 @@ export class ExecutorConfig {
this.eventTarget = options.eventTarget || new EventTarget();
this.maxTaskRetries = options.maxTaskRetries ?? DEFAULTS.maxTaskRetries;
this.jobStorage = options.jobStorage || new InMemoryJobStorage();
this.startupTimeout = options.startupTimeout ?? DEFAULTS.startupTimeout;
}
}
64 changes: 64 additions & 0 deletions src/executor/executor.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { MarketService } from "../market/";
import { AgreementPoolService } from "../agreement/";
import { TaskService } from "../task/";
import { TaskExecutor } from "./executor";
import { sleep } from "../utils";
import { LoggerMock } from "../../tests/mock";

jest.mock("../market/service");
jest.mock("../agreement/service");
jest.mock("../network/service");
jest.mock("../task/service");
jest.mock("../storage/gftp");
jest.mock("../utils/yagna/yagna");

const serviceRunSpy = jest.fn().mockImplementation(() => Promise.resolve());
jest.spyOn(MarketService.prototype, "run").mockImplementation(serviceRunSpy);
jest.spyOn(AgreementPoolService.prototype, "run").mockImplementation(serviceRunSpy);
jest.spyOn(TaskService.prototype, "run").mockImplementation(serviceRunSpy);

jest.mock("../payment/service", () => {
return {
PaymentService: jest.fn().mockImplementation(() => {
return {
config: { payment: { network: "test" } },
createAllocation: jest.fn(),
run: serviceRunSpy,
end: jest.fn(),
};
}),
};
});

describe("Task Executor", () => {
const logger = new LoggerMock();
const yagnaOptions = { apiKey: "test" };
beforeEach(() => {
jest.clearAllMocks();
logger.clear();
});

describe("init()", () => {
it("should run all set services", async () => {
const executor = await TaskExecutor.create({ package: "test", logger, yagnaOptions });
expect(serviceRunSpy).toHaveBeenCalledTimes(4);
expect(executor).toBeDefined();
await executor.end();
});
it("should handle a critical error if startup timeout is reached", async () => {
const executor = await TaskExecutor.create({ package: "test", startupTimeout: 0, logger, yagnaOptions });
jest
.spyOn(MarketService.prototype, "getProposalsCount")
.mockImplementation(() => ({ confirmed: 0, initial: 0, rejected: 0 }));
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const handleErrorSpy = jest.spyOn(executor as any, "handleCriticalError").mockImplementation((error) => {
expect((error as Error).message).toEqual(
"Could not start any work on Golem. Processed 0 initial proposals from yagna, filters accepted 0. Check your demand if it's not too restrictive or restart yagna.",
);
});
await sleep(10, true);
expect(handleErrorSpy).toHaveBeenCalled();
await executor.end();
});
});
});
37 changes: 36 additions & 1 deletion src/executor/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ export type ExecutorOptions = {
* For more details see {@link JobStorage}. Defaults to a simple in-memory storage.
*/
jobStorage?: JobStorage;
/**
* Timeout for waiting for at least one offer from the market.
* This parameter (set to 30 sec by default) will throw an error when executing `TaskExecutor.run`
* if no offer from the market is accepted before this time.
* You can set a slightly higher time in a situation where your parameters such as proposalFilter
* or minimum hardware requirements are quite restrictive and finding a suitable provider
* that meets these criteria may take a bit longer.
* */
startupTimeout?: number;
} & Omit<PackageOptions, "imageHash" | "imageTag"> &
MarketOptions &
TaskServiceOptions &
Expand Down Expand Up @@ -91,6 +100,7 @@ export class TaskExecutor {
private isRunning = true;
private configOptions: ExecutorOptions;
private isCanceled = false;
private startupTimeoutId?: NodeJS.Timeout;
private yagna: Yagna;

/**
Expand Down Expand Up @@ -206,7 +216,7 @@ export class TaskExecutor {
this.logger?.debug("Initializing task executor services...");
const allocations = await this.paymentService.createAllocation();
await Promise.all([
this.marketService.run(taskPackage, allocations),
this.marketService.run(taskPackage, allocations).then(() => this.setStartupTimeout()),
this.agreementPoolService.run(),
this.paymentService.run(),
this.networkService?.run(),
Expand All @@ -228,6 +238,7 @@ export class TaskExecutor {
if (runtimeContextChecker.isNode) this.removeCancelEvent();
if (!this.isRunning) return;
this.isRunning = false;
clearTimeout(this.startupTimeoutId);
if (!this.configOptions.storageProvider) await this.storageProvider?.close();
await this.networkService?.end();
await Promise.all([this.taskService.end(), this.agreementPoolService.end(), this.marketService.end()]);
Expand Down Expand Up @@ -482,4 +493,28 @@ export class TaskExecutor {
if (costsSummary.length) this.logger?.table?.(costsSummary);
this.logger?.info(`Total Cost: ${costs.total} Total Paid: ${costs.paid}`);
}

/**
* Sets a timeout for waiting for offers from the market.
* If at least one offer is not confirmed during the set timeout,
* a critical error will be reported and the entire process will be interrupted.
*/
private setStartupTimeout() {
this.startupTimeoutId = setTimeout(() => {
const proposalsCount = this.marketService.getProposalsCount();
if (proposalsCount.confirmed === 0) {
const hint =
proposalsCount.initial === 0 && proposalsCount.confirmed === 0
? "Check your demand if it's not too restrictive or restart yagna."
: proposalsCount.initial === proposalsCount.rejected
? "All off proposals got rejected."
: "Check your proposal filters if they are not too restrictive.";
this.handleCriticalError(
new Error(
`Could not start any work on Golem. Processed ${proposalsCount.initial} initial proposals from yagna, filters accepted ${proposalsCount.confirmed}. ${hint}`,
),
);
}
}, this.options.startupTimeout);
}
}
24 changes: 21 additions & 3 deletions src/market/service.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import { Logger, sleep } from "../utils";
import { YagnaApi, Logger, sleep } from "../utils";
import { Package } from "../package";
import { Proposal } from "./proposal";
import { AgreementPoolService } from "../agreement";
import { Allocation } from "../payment";
import { Demand, DemandEvent, DemandEventType, DemandOptions } from "./demand";
import { MarketConfig } from "./config";
import { YagnaApi } from "../utils/yagna/yagna";

export type ProposalFilter = (proposal: Proposal) => Promise<boolean> | boolean;

Expand All @@ -28,6 +27,11 @@ export class MarketService {
private logger?: Logger;
private taskPackage?: Package;
private maxResubscribeRetries = 5;
private proposalsCount = {
initial: 0,
confirmed: 0,
rejected: 0,
};

constructor(
private readonly agreementPoolService: AgreementPoolService,
Expand All @@ -53,10 +57,18 @@ export class MarketService {
this.logger?.debug("Market Service has been stopped");
}

getProposalsCount() {
return this.proposalsCount;
}
private async createDemand(): Promise<true> {
if (!this.taskPackage || !this.allocation) throw new Error("The service has not been started correctly.");
this.demand = await Demand.create(this.taskPackage, this.allocation, this.yagnaApi, this.options);
this.demand.addEventListener(DemandEventType, this.demandEventListener.bind(this));
this.proposalsCount = {
initial: 0,
confirmed: 0,
rejected: 0,
};
this.logger?.debug(`New demand has been created (${this.demand.id})`);
return true;
}
Expand All @@ -72,7 +84,10 @@ export class MarketService {
if (proposal.isInitial()) this.processInitialProposal(proposal);
else if (proposal.isDraft()) this.processDraftProposal(proposal);
else if (proposal.isExpired()) this.logger?.debug(`Proposal hes expired ${proposal.id}`);
else if (proposal.isRejected()) this.logger?.debug(`Proposal hes rejected ${proposal.id}`);
else if (proposal.isRejected()) {
this.proposalsCount.rejected++;
this.logger?.debug(`Proposal hes rejected ${proposal.id}`);
}
}

private async resubscribeDemand() {
Expand All @@ -92,6 +107,7 @@ export class MarketService {
private async processInitialProposal(proposal: Proposal) {
if (!this.allocation) throw new Error("The service has not been started correctly.");
this.logger?.debug(`New proposal has been received (${proposal.id})`);
this.proposalsCount.initial++;
try {
const { result: isProposalValid, reason } = await this.isProposalValid(proposal);
if (isProposalValid) {
Expand All @@ -101,6 +117,7 @@ export class MarketService {
.catch((e) => this.logger?.debug(`Unable to respond proposal ${proposal.id}. ${e}`));
this.logger?.debug(`Proposal has been responded (${proposal.id})`);
} else {
this.proposalsCount.rejected++;
this.logger?.debug(`Proposal has been rejected (${proposal.id}). Reason: ${reason}`);
}
} catch (error) {
Expand All @@ -122,6 +139,7 @@ export class MarketService {

private async processDraftProposal(proposal: Proposal) {
await this.agreementPoolService.addProposal(proposal);
this.proposalsCount.confirmed++;
this.logger?.debug(
`Proposal has been confirmed with provider ${proposal.issuerId} and added to agreement pool (${proposal.id})`,
);
Expand Down
2 changes: 2 additions & 0 deletions src/payment/payments.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ export class Payments extends EventTarget {
{ timeout: 0 },
);
for (const event of invoiceEvents) {
if (!this.isRunning) return;
if (event.eventType !== "InvoiceReceivedEvent") continue;
const invoice = await Invoice.create(event["invoiceId"], this.yagnaApi, { ...this.options }).catch(
(e) =>
Expand Down Expand Up @@ -95,6 +96,7 @@ export class Payments extends EventTarget {
)
.catch(() => ({ data: [] }));
for (const event of debitNotesEvents) {
if (!this.isRunning) return;
if (event.eventType !== "DebitNoteReceivedEvent") continue;
const debitNote = await DebitNote.create(event["debitNoteId"], this.yagnaApi, { ...this.options }).catch(
(e) =>
Expand Down
2 changes: 1 addition & 1 deletion src/payment/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ export class PaymentService {
clearTimeout(timeoutId);
}
this.isRunning = false;
this.payments?.unsubscribe().catch((error) => this.logger?.warn(error));
await this.payments?.unsubscribe().catch((error) => this.logger?.warn(error));
this.payments?.removeEventListener(PaymentEventType, this.subscribePayments.bind(this));
await this.allocation?.release().catch((error) => this.logger?.warn(error));
this.logger?.info("Allocation has been released");
Expand Down
Loading
Loading