Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(executor): added error when no proposals are received #607

Merged
merged 23 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/executor/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const DEFAULTS = Object.freeze({
taskTimeout: 1000 * 60 * 5, // 5 min,
maxTaskRetries: 3,
enableLogging: true,
startupTimeout: 1000 * 30, // 30 sec
});

/**
Expand All @@ -34,6 +35,7 @@ export class ExecutorConfig {
readonly maxTaskRetries: number;
readonly activityExecuteTimeout?: number;
readonly jobStorage: JobStorage;
readonly startupTimeout: number;

constructor(options: ExecutorOptions & ActivityOptions) {
const processEnv = !runtimeContextChecker.isBrowser
Expand Down Expand Up @@ -83,5 +85,6 @@ export class ExecutorConfig {
this.eventTarget = options.eventTarget || new EventTarget();
this.maxTaskRetries = options.maxTaskRetries ?? DEFAULTS.maxTaskRetries;
this.jobStorage = options.jobStorage || new InMemoryJobStorage();
this.startupTimeout = options.startupTimeout ?? DEFAULTS.startupTimeout;
}
}
30 changes: 30 additions & 0 deletions src/executor/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ export type ExecutorOptions = {
* For more details see {@link JobStorage}. Defaults to a simple in-memory storage.
*/
jobStorage?: JobStorage;
/**
* Timeout for waiting for at least one offer from the market.
* This parameter (set to 30 sec by default) will throw an error when executing `TaskExecutor.run`
* if no offer from the market is accepted before this time.
* You can set a slightly higher time in a situation where your parameters such as proposalFilter
* or minimum hardware requirements are quite restrictive and finding a suitable provider
* that meets these criteria may take a bit longer.
* */
startupTimeout?: number;
} & Omit<PackageOptions, "imageHash" | "imageTag"> &
MarketOptions &
TaskServiceOptions &
Expand Down Expand Up @@ -91,7 +100,9 @@ export class TaskExecutor {
private isRunning = true;
private configOptions: ExecutorOptions;
private isCanceled = false;
private startupTimeoutId?: NodeJS.Timeout;
private yagna: Yagna;
private startupTimeoutError = false;

/**
* Create a new Task Executor
Expand Down Expand Up @@ -226,6 +237,7 @@ export class TaskExecutor {
if (runtimeContextChecker.isNode) this.removeCancelEvent();
if (!this.isRunning) return;
this.isRunning = false;
clearTimeout(this.startupTimeoutId);
if (!this.configOptions.storageProvider) await this.storageProvider?.close();
await this.networkService?.end();
await Promise.all([this.taskService.end(), this.agreementPoolService.end(), this.marketService.end()]);
Expand Down Expand Up @@ -385,11 +397,29 @@ export class TaskExecutor {
timeout: options?.timeout ?? this.options.taskTimeout,
});
this.taskQueue.addToEnd(task as Task<unknown, unknown>);
if (!this.startupTimeoutError && !this.startupTimeoutId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have few issues with this, we can talk about this f2f:

  • This way of communicating the error with the timeout isn't easy for the user to handle, we should work on that
  • Since this is implemented as part of executeTask, this timeout will be started and counted for each scheduled task. I do see a concurrency problem here, given all the tasks are executed on the same TaskExecutor instance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this timeout will only start once, unless this.startupTimeoutId exists yet, so there shouldn't be a problem with concurrency, I think, but let's talk about it..

this.startupTimeoutId = setTimeout(
() => (this.startupTimeoutError = this.marketService.getProposalsCount().confirmed === 0),
this.options.startupTimeout,
);
}
while (this.isRunning) {
if (task.isFinished()) {
if (task.isRejected()) throw task.getError();
return task.getResults();
}
if (this.startupTimeoutError) {
const proposalsCount = this.marketService.getProposalsCount();
const hint =
proposalsCount.initial === 0 && proposalsCount.confirmed === 0
? "Check your demand if it's not too restrictive or restart yagna."
: proposalsCount.initial === proposalsCount.rejected
? "All off proposals got rejected."
: "Check your proposal filters if they are not too restrictive.";
throw new Error(
`Could not start any work on Golem. Processed ${proposalsCount.initial} initial proposals from yagna, filters accepted ${proposalsCount.confirmed}. ${hint}`,
);
}
await sleep(2000, true);
}
}
Expand Down
24 changes: 21 additions & 3 deletions src/market/service.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import { Logger, sleep } from "../utils";
import { YagnaApi, Logger, sleep } from "../utils";
import { Package } from "../package";
import { Proposal } from "./proposal";
import { AgreementPoolService } from "../agreement";
import { Allocation } from "../payment";
import { Demand, DemandEvent, DemandEventType, DemandOptions } from "./demand";
import { MarketConfig } from "./config";
import { YagnaApi } from "../utils/yagna/yagna";

export type ProposalFilter = (proposal: Proposal) => Promise<boolean> | boolean;

Expand All @@ -28,6 +27,11 @@ export class MarketService {
private logger?: Logger;
private taskPackage?: Package;
private maxResubscribeRetries = 5;
private proposalsCount = {
initial: 0,
confirmed: 0,
rejected: 0,
};

constructor(
private readonly agreementPoolService: AgreementPoolService,
Expand All @@ -53,10 +57,18 @@ export class MarketService {
this.logger?.debug("Market Service has been stopped");
}

getProposalsCount() {
return this.proposalsCount;
}
private async createDemand(): Promise<true> {
if (!this.taskPackage || !this.allocation) throw new Error("The service has not been started correctly.");
this.demand = await Demand.create(this.taskPackage, this.allocation, this.yagnaApi, this.options);
this.demand.addEventListener(DemandEventType, this.demandEventListener.bind(this));
this.proposalsCount = {
initial: 0,
confirmed: 0,
rejected: 0,
};
this.logger?.debug(`New demand has been created (${this.demand.id})`);
return true;
}
Expand All @@ -72,7 +84,10 @@ export class MarketService {
if (proposal.isInitial()) this.processInitialProposal(proposal);
else if (proposal.isDraft()) this.processDraftProposal(proposal);
else if (proposal.isExpired()) this.logger?.debug(`Proposal hes expired ${proposal.id}`);
else if (proposal.isRejected()) this.logger?.debug(`Proposal hes rejected ${proposal.id}`);
else if (proposal.isRejected()) {
this.proposalsCount.rejected++;
this.logger?.debug(`Proposal hes rejected ${proposal.id}`);
}
}

private async resubscribeDemand() {
Expand All @@ -92,6 +107,7 @@ export class MarketService {
private async processInitialProposal(proposal: Proposal) {
if (!this.allocation) throw new Error("The service has not been started correctly.");
this.logger?.debug(`New proposal has been received (${proposal.id})`);
this.proposalsCount.initial++;
try {
const { result: isProposalValid, reason } = await this.isProposalValid(proposal);
if (isProposalValid) {
Expand All @@ -101,6 +117,7 @@ export class MarketService {
.catch((e) => this.logger?.debug(`Unable to respond proposal ${proposal.id}. ${e}`));
this.logger?.debug(`Proposal has been responded (${proposal.id})`);
} else {
this.proposalsCount.rejected++;
this.logger?.debug(`Proposal has been rejected (${proposal.id}). Reason: ${reason}`);
}
} catch (error) {
Expand All @@ -122,6 +139,7 @@ export class MarketService {

private async processDraftProposal(proposal: Proposal) {
await this.agreementPoolService.addProposal(proposal);
this.proposalsCount.confirmed++;
this.logger?.debug(
`Proposal has been confirmed with provider ${proposal.issuerId} and added to agreement pool (${proposal.id})`,
);
Expand Down
Loading