Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(executor): added error when no proposals are received #607

Merged
merged 23 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/executor/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const DEFAULTS = Object.freeze({
taskTimeout: 1000 * 60 * 5, // 5 min,
maxTaskRetries: 3,
enableLogging: true,
startupTimeout: 1000 * 30, // 30 sec
});

/**
Expand All @@ -34,6 +35,7 @@ export class ExecutorConfig {
readonly maxTaskRetries: number;
readonly activityExecuteTimeout?: number;
readonly jobStorage: JobStorage;
readonly startupTimeout: number;

constructor(options: ExecutorOptions & ActivityOptions) {
const processEnv = !runtimeContextChecker.isBrowser
Expand Down Expand Up @@ -83,5 +85,6 @@ export class ExecutorConfig {
this.eventTarget = options.eventTarget || new EventTarget();
this.maxTaskRetries = options.maxTaskRetries ?? DEFAULTS.maxTaskRetries;
this.jobStorage = options.jobStorage || new InMemoryJobStorage();
this.startupTimeout = options.startupTimeout ?? DEFAULTS.startupTimeout;
}
}
39 changes: 38 additions & 1 deletion src/executor/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ export type ExecutorOptions = {
* For more details see {@link JobStorage}. Defaults to a simple in-memory storage.
*/
jobStorage?: JobStorage;
/**
* Timeout for waiting for at least one offer from the market.
* This parameter (set to 30 sec by default) will throw an error when executing `TaskExecutor.run`
* if no offer from the market is accepted before this time.
* You can set a slightly higher time in a situation where your parameters such as proposalFilter
* or minimum hardware requirements are quite restrictive and finding a suitable provider
* that meets these criteria may take a bit longer.
* */
startupTimeout?: number;
} & Omit<PackageOptions, "imageHash" | "imageTag"> &
MarketOptions &
TaskServiceOptions &
Expand Down Expand Up @@ -91,6 +100,7 @@ export class TaskExecutor {
private isRunning = true;
private configOptions: ExecutorOptions;
private isCanceled = false;
private startupTimeoutId?: NodeJS.Timeout;
private yagna: Yagna;

/**
Expand Down Expand Up @@ -205,8 +215,10 @@ export class TaskExecutor {

this.logger?.debug("Initializing task executor services...");
const allocations = await this.paymentService.createAllocation();
this.agreementPoolService.run().catch((e) => this.handleCriticalError(e));
this.paymentService.run().catch((e) => this.handleCriticalError(e));
await Promise.all([
this.marketService.run(taskPackage, allocations),
this.marketService.run(taskPackage, allocations).then(() => this.setStartupTimeout()),
this.agreementPoolService.run(),
this.paymentService.run(),
this.networkService?.run(),
Expand All @@ -228,6 +240,7 @@ export class TaskExecutor {
if (runtimeContextChecker.isNode) this.removeCancelEvent();
if (!this.isRunning) return;
this.isRunning = false;
clearTimeout(this.startupTimeoutId);
if (!this.configOptions.storageProvider) await this.storageProvider?.close();
await this.networkService?.end();
await Promise.all([this.taskService.end(), this.agreementPoolService.end(), this.marketService.end()]);
Expand Down Expand Up @@ -482,4 +495,28 @@ export class TaskExecutor {
if (costsSummary.length) this.logger?.table?.(costsSummary);
this.logger?.info(`Total Cost: ${costs.total} Total Paid: ${costs.paid}`);
}

/**
* Sets a timeout for waiting for offers from the market.
* If at least one offer is not confirmed during the set timeout,
* a critical error will be reported and the entire process will be interrupted.
*/
private setStartupTimeout() {
this.startupTimeoutId = setTimeout(() => {
const proposalsCount = this.marketService.getProposalsCount();
if (proposalsCount.confirmed === 0) {
const hint =
proposalsCount.initial === 0 && proposalsCount.confirmed === 0
? "Check your demand if it's not too restrictive or restart yagna."
: proposalsCount.initial === proposalsCount.rejected
? "All off proposals got rejected."
: "Check your proposal filters if they are not too restrictive.";
this.handleCriticalError(
new Error(
`Could not start any work on Golem. Processed ${proposalsCount.initial} initial proposals from yagna, filters accepted ${proposalsCount.confirmed}. ${hint}`,
),
);
}
}, this.options.startupTimeout);
}
}
24 changes: 21 additions & 3 deletions src/market/service.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import { Logger, sleep } from "../utils";
import { YagnaApi, Logger, sleep } from "../utils";
import { Package } from "../package";
import { Proposal } from "./proposal";
import { AgreementPoolService } from "../agreement";
import { Allocation } from "../payment";
import { Demand, DemandEvent, DemandEventType, DemandOptions } from "./demand";
import { MarketConfig } from "./config";
import { YagnaApi } from "../utils/yagna/yagna";

export type ProposalFilter = (proposal: Proposal) => Promise<boolean> | boolean;

Expand All @@ -28,6 +27,11 @@ export class MarketService {
private logger?: Logger;
private taskPackage?: Package;
private maxResubscribeRetries = 5;
private proposalsCount = {
initial: 0,
confirmed: 0,
rejected: 0,
};

constructor(
private readonly agreementPoolService: AgreementPoolService,
Expand All @@ -53,10 +57,18 @@ export class MarketService {
this.logger?.debug("Market Service has been stopped");
}

getProposalsCount() {
return this.proposalsCount;
}
private async createDemand(): Promise<true> {
if (!this.taskPackage || !this.allocation) throw new Error("The service has not been started correctly.");
this.demand = await Demand.create(this.taskPackage, this.allocation, this.yagnaApi, this.options);
this.demand.addEventListener(DemandEventType, this.demandEventListener.bind(this));
this.proposalsCount = {
initial: 0,
confirmed: 0,
rejected: 0,
};
this.logger?.debug(`New demand has been created (${this.demand.id})`);
return true;
}
Expand All @@ -72,7 +84,10 @@ export class MarketService {
if (proposal.isInitial()) this.processInitialProposal(proposal);
else if (proposal.isDraft()) this.processDraftProposal(proposal);
else if (proposal.isExpired()) this.logger?.debug(`Proposal hes expired ${proposal.id}`);
else if (proposal.isRejected()) this.logger?.debug(`Proposal hes rejected ${proposal.id}`);
else if (proposal.isRejected()) {
this.proposalsCount.rejected++;
this.logger?.debug(`Proposal hes rejected ${proposal.id}`);
}
}

private async resubscribeDemand() {
Expand All @@ -92,6 +107,7 @@ export class MarketService {
private async processInitialProposal(proposal: Proposal) {
if (!this.allocation) throw new Error("The service has not been started correctly.");
this.logger?.debug(`New proposal has been received (${proposal.id})`);
this.proposalsCount.initial++;
try {
const { result: isProposalValid, reason } = await this.isProposalValid(proposal);
if (isProposalValid) {
Expand All @@ -101,6 +117,7 @@ export class MarketService {
.catch((e) => this.logger?.debug(`Unable to respond proposal ${proposal.id}. ${e}`));
this.logger?.debug(`Proposal has been responded (${proposal.id})`);
} else {
this.proposalsCount.rejected++;
this.logger?.debug(`Proposal has been rejected (${proposal.id}). Reason: ${reason}`);
}
} catch (error) {
Expand All @@ -122,6 +139,7 @@ export class MarketService {

private async processDraftProposal(proposal: Proposal) {
await this.agreementPoolService.addProposal(proposal);
this.proposalsCount.confirmed++;
this.logger?.debug(
`Proposal has been confirmed with provider ${proposal.issuerId} and added to agreement pool (${proposal.id})`,
);
Expand Down
Loading