From 523d32ad4682bf235eb82bfeff00478385e750d3 Mon Sep 17 00:00:00 2001 From: filipgolem <44880692+filipgolem@users.noreply.github.com> Date: Mon, 15 Feb 2021 10:59:04 +0100 Subject: [PATCH 1/9] Market strategy that decreases score for providers that fail to confirm agreements (#126) --- yajsapi/executor/index.ts | 25 ++++++++++++++++++----- yajsapi/executor/strategy.ts | 39 +++++++++++++++++++++++++++++++++--- yajsapi/rest/market.ts | 2 +- 3 files changed, 57 insertions(+), 9 deletions(-) diff --git a/yajsapi/executor/index.ts b/yajsapi/executor/index.ts index d830e64ba..63d74d8b6 100755 --- a/yajsapi/executor/index.ts +++ b/yajsapi/executor/index.ts @@ -33,7 +33,13 @@ export const sgx = _sgx; export const vm = _vm; import { Task, TaskStatus } from "./task"; import { Consumer, SmartQueue } from "./smartq"; -import { LeastExpensiveLinearPayuMS, MarketStrategy, SCORE_NEUTRAL } from "./strategy"; +import { + ComputationHistory, + DecreaseScoreForUnconfirmedAgreement, + LeastExpensiveLinearPayuMS, + MarketStrategy, + SCORE_NEUTRAL +} from "./strategy"; import { Package } from "../package"; export { Task, TaskStatus }; @@ -116,7 +122,7 @@ export type ExecutorOpts = { * * @description Used to run tasks using the specified application package within providers' execution units. */ -export class Executor { +export class Executor implements ComputationHistory { private _subnet; private _driver; private _network; @@ -129,6 +135,7 @@ export class Executor { private _expires; private _budget_amount; private _budget_allocations: Allocation[]; + private _rejecting_providers: Set; private _activity_api; private _market_api; @@ -156,7 +163,7 @@ export class Executor { max_workers = 5, timeout = dayjs.duration({ minutes: 5 }).asMilliseconds(), budget, - strategy = new LeastExpensiveLinearPayuMS(), + strategy, subnet_tag, driver, network, @@ -166,7 +173,8 @@ export class Executor { this._driver = driver ? driver.toLowerCase() : DEFAULT_DRIVER; this._network = network ? network.toLowerCase() : DEFAULT_NETWORK; this._stream_output = false; - this._strategy = strategy; + this._strategy = + strategy || new DecreaseScoreForUnconfirmedAgreement(new LeastExpensiveLinearPayuMS(), 0.5); this._api_config = new rest.Configuration(); this._stack = new AsyncExitStack(); this._task_package = task_package; @@ -174,6 +182,7 @@ export class Executor { // TODO: setup precision this._budget_amount = parseFloat(budget); this._budget_allocations = []; + this._rejecting_providers = new Set(); this._cancellation_token = new CancellationToken(); let cancellationToken = this._cancellation_token; @@ -389,7 +398,7 @@ export class Executor { offers_collected += 1; let score; try { - score = await strategy.score_offer(proposal); + score = await strategy.score_offer(proposal, self); logger.debug(`Scored offer ${proposal.id()}, ` + `provider: ${proposal.props()["golem.node.id.name"]}, ` + `strategy: ${strategy.constructor.name}, ` + @@ -677,8 +686,10 @@ export class Executor { if (self._worker_cancellation_token.cancelled) { break; } if (!(await agreement.confirm())) { emit(new events.AgreementRejected({ agr_id: agreement.id() })); + self._rejecting_providers.add(provider_id); continue; } + self._rejecting_providers.delete(provider_id); emit(new events.AgreementConfirmed({ agr_id: agreement.id() })); if (self._worker_cancellation_token.cancelled) { break; } new_task = loop.create_task(_start_worker.bind(null, agreement)); @@ -873,6 +884,10 @@ export class Executor { } } + rejected_last_agreement(provider_id: string): boolean { + return this._rejecting_providers.has(provider_id); + } + async ready(): Promise { let stack = this._stack; // TODO: Cleanup on exception here. diff --git a/yajsapi/executor/strategy.ts b/yajsapi/executor/strategy.ts index d8ea11b15..703e49c74 100644 --- a/yajsapi/executor/strategy.ts +++ b/yajsapi/executor/strategy.ts @@ -19,12 +19,16 @@ export const CFF_DEFAULT_PRICE_FOR_COUNTER: Map = new Map([ [Counter.CPU, parseFloat("0.002") * 10], ]); +export interface ComputationHistory { + rejected_last_agreement: (string) => boolean; +} + export class MarketStrategy { /*Abstract market strategy*/ async decorate_demand(demand: DemandBuilder): Promise {} - async score_offer(offer: OfferProposal): Promise { + async score_offer(offer: OfferProposal, history?: ComputationHistory): Promise { return SCORE_REJECTED; } } @@ -44,7 +48,7 @@ export class DummyMS extends MarketGeneral { this._activity = new Activity().from_properties(demand._properties); } - async score_offer(offer: OfferProposal): Promise { + async score_offer(offer: OfferProposal, history?: ComputationHistory): Promise { const linear: ComLinear = new ComLinear().from_properties(offer.props()); if (linear.scheme.value !== BillingScheme.PAYU) { @@ -73,7 +77,7 @@ export class LeastExpensiveLinearPayuMS { demand.ensure(`(${PRICE_MODEL}=${PriceModel.LINEAR})`); } - async score_offer(offer: OfferProposal): Promise { + async score_offer(offer: OfferProposal, history?: ComputationHistory): Promise { const linear: ComLinear = new ComLinear().from_properties(offer.props()); logger.debug(`Scoring offer ${offer.id()}, parameters: ${JSON.stringify(linear)}`); @@ -111,3 +115,32 @@ export class LeastExpensiveLinearPayuMS { return score; } } + +export class DecreaseScoreForUnconfirmedAgreement { + /* A market strategy that modifies a base strategy based on history of agreements. */ + + private _base_strategy; + private _factor; + + constructor(base_strategy, factor) { + this._base_strategy = base_strategy; + this._factor = factor; + } + + async decorate_demand(demand: DemandBuilder): Promise { + /* Decorate `demand` using the base strategy. */ + await this._base_strategy.decorate_demand(demand); + } + + async score_offer(offer: OfferProposal, history?: ComputationHistory): Promise { + /* Score `offer` using the base strategy and apply penalty if needed. + If the offer issuer failed to approve the previous agreement (if any) + then the base score is multiplied by `this._factor`. */ + let score = await this._base_strategy.score_offer(offer); + if (history && history.rejected_last_agreement(offer.issuer()) && score > 0) { + score *= this._factor; + logger.debug(`Decreasing score for offer ${offer.id()} from '${offer.issuer()}'`); + } + return score; + } +} diff --git a/yajsapi/rest/market.ts b/yajsapi/rest/market.ts index f9db195c4..8e74ce6ae 100755 --- a/yajsapi/rest/market.ts +++ b/yajsapi/rest/market.ts @@ -67,7 +67,7 @@ export class Agreement { async confirm(): Promise { await this._api.confirmAgreement(this._id); try { - let { data: msg } = await this._api.waitForApproval(this._id, 90, { timeout: 100000 }); + let { data: msg } = await this._api.waitForApproval(this._id, 15, { timeout: 16000 }); return true; } catch (error) { logger.debug(`waitForApproval(${this._id}) raised ApiException ${error}`); From 521cea60954608540fe92460c5be4578290c8e3d Mon Sep 17 00:00:00 2001 From: filipgolem <44880692+filipgolem@users.noreply.github.com> Date: Mon, 15 Feb 2021 10:59:38 +0100 Subject: [PATCH 2/9] Command output logs & getExecBatchResults timeout fix (#125) * Add command output logs (logged only to file and only when -d is specified) * Increase failed command output log level to debug * Add client-side timeout for getExecBatchResults * Bump version --- package.json | 2 +- yajsapi/rest/activity.ts | 5 ++++- yajsapi/utils/log.ts | 28 +++++++++++----------------- 3 files changed, 16 insertions(+), 19 deletions(-) diff --git a/package.json b/package.json index efb1a439f..ee1863af1 100755 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "yajsapi", - "version": "0.3.0-alpha.7", + "version": "0.3.0-alpha.8", "description": "NodeJS API for Next Golem", "repository": "https://github.com/golemfactory/yajsapi", "main": "dist/index.js", diff --git a/yajsapi/rest/activity.ts b/yajsapi/rest/activity.ts index 34bc795ab..a294ae7ea 100755 --- a/yajsapi/rest/activity.ts +++ b/yajsapi/rest/activity.ts @@ -426,12 +426,15 @@ class PollingBatch extends Batch { this.activity_id, this.batch_id, undefined, - { params: { timeout: 5 } } + { timeout: 5000 } ); results = data; } catch (error) { + const timeout_msg = error.message && error.message.includes("timeout"); if (error.response && error.response.status === 408) { continue; + } else if (error.code === "ETIMEDOUT" || (error.code === "ECONNABORTED" && timeout_msg)) { + continue; } else { if (error.response && error.response.status == 500 && error.response.data) { throw new CommandExecutionError( diff --git a/yajsapi/utils/log.ts b/yajsapi/utils/log.ts index 25a232f9f..f39a2b5a2 100644 --- a/yajsapi/utils/log.ts +++ b/yajsapi/utils/log.ts @@ -231,18 +231,13 @@ class SummaryLogger { ); } else if (eventName === events.CommandExecuted.name) { const provider_name = this.agreement_provider_info[event["agr_id"]].name; + const cmd = JSON.stringify(event["command"]); if (event["success"]) { - logger.debug( - `Command successful on provider '${provider_name}', command: ${JSON.stringify( - event["command"] - )}.` - ); + logger.debug(`Command successful on provider '${provider_name}', command: ${cmd}.`); + logger.silly(`Command ${cmd}: stdout: ${event["stdout"]}, stderr: ${event["stderr"]}, msg: ${event["message"]}.`); } else { - logger.warn( - `Command failed on provider '${provider_name}', command: ${JSON.stringify( - event["command"] - )}, output: ${event["message"]}` - ); + logger.warn(`Command failed on provider '${provider_name}', command: ${cmd}, msg: ${event["message"]}`); + logger.debug(`Command ${cmd}: stdout: ${event["stdout"]}, stderr: ${event["stderr"]}.`); } } else if (eventName === events.ScriptFinished.name) { const provider_info = this.agreement_provider_info[event["agr_id"]]; @@ -353,22 +348,21 @@ export function logSummary( export const changeLogLevel = (level: string) => { options.level = level; - options.transports.push( + options.transports = [ + new winston.transports.Console({ level: level }), new winston.transports.File({ filename: path.join( "logs", `yajsapi-${dayjs().format("YYYY-MM-DD_HH-mm-ss")}.log` ), - level: "debug", - }) as any - ); - options.transports.push( + level: "silly", + }) as any, new winston.transports.File({ filename: path.join("logs", "yajsapi-current.log"), - level: "debug", + level: "silly", options: { flags: "w" }, }) as any - ); + ]; logger.configure(options); }; From 8ba1bbb9ab570477361244a556e9acdaeb099dc7 Mon Sep 17 00:00:00 2001 From: Filip Date: Mon, 15 Feb 2021 19:32:47 +0100 Subject: [PATCH 3/9] Bump version --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index ee1863af1..8dac12da0 100755 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "yajsapi", - "version": "0.3.0-alpha.8", + "version": "0.3.1-alpha.1", "description": "NodeJS API for Next Golem", "repository": "https://github.com/golemfactory/yajsapi", "main": "dist/index.js", From 948e2a8b1517e07e88efd46ceff32e388502a004 Mon Sep 17 00:00:00 2001 From: Filip Date: Tue, 16 Feb 2021 11:57:33 +0100 Subject: [PATCH 4/9] Display payment allocation error message (closes #121) --- yajsapi/rest/payment.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/yajsapi/rest/payment.ts b/yajsapi/rest/payment.ts index 0cae8257e..34d3fb313 100755 --- a/yajsapi/rest/payment.ts +++ b/yajsapi/rest/payment.ts @@ -171,7 +171,8 @@ class _AllocationTask extends ResourceCtx { _allocation.expires = new Date(parseInt(model.timeout) * 1000); return _allocation; } catch (error) { - logger.error(error); + const msg = error.response && error.response.data ? error.response.data.message : error.message; + logger.error(`Payment allocation error (message: ${msg}). Please run "yagna payment status" to check your account.`); throw new Error(error); } } From fdcad35656cb2117908495e5b38133908632a94d Mon Sep 17 00:00:00 2001 From: Filip Date: Tue, 16 Feb 2021 12:43:34 +0100 Subject: [PATCH 5/9] Use paymentCancellationToken to cancel processing invoices --- yajsapi/executor/index.ts | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/yajsapi/executor/index.ts b/yajsapi/executor/index.ts index 63d74d8b6..90f1956f5 100755 --- a/yajsapi/executor/index.ts +++ b/yajsapi/executor/index.ts @@ -144,6 +144,7 @@ export class Executor implements ComputationHistory { private _wrapped_consumer; private _cancellation_token: CancellationToken; private _worker_cancellation_token: CancellationToken; + private _payment_cancellation_token: CancellationToken; /** * Create new executor @@ -190,6 +191,8 @@ export class Executor implements ComputationHistory { this._worker_cancellation_token = new CancellationToken(); let workerCancellationToken = this._worker_cancellation_token; + this._payment_cancellation_token = new CancellationToken(); + function cancel(event) { if (cancellationToken && !cancellationToken.cancelled) { cancellationToken.cancel(); @@ -256,6 +259,7 @@ export class Executor implements ComputationHistory { let activity_api = this._activity_api; let strategy = this._strategy; let cancellationToken = this._cancellation_token; + let paymentCancellationToken = this._payment_cancellation_token; let done_queue: Queue> = new Queue([]); let stream_output = this._stream_output; @@ -286,7 +290,7 @@ export class Executor implements ComputationHistory { async function process_invoices(): Promise { for await (let invoice of self._payment_api.incoming_invoices( - cancellationToken + paymentCancellationToken )) { if (agreements_to_pay.has(invoice.agreementId)) { emit( @@ -812,6 +816,8 @@ export class Executor implements ComputationHistory { emit(new events.CheckingPayments()); } } + if (!self._payment_cancellation_token.cancelled) + self._payment_cancellation_token.cancel(); emit(new events.PaymentsFinished()); await sleep(2); cancellationToken.cancel(); From b16fa563757a34497b4fe0641695cdc005d5628b Mon Sep 17 00:00:00 2001 From: Filip Date: Tue, 16 Feb 2021 21:18:04 +0100 Subject: [PATCH 6/9] Add shutdown logs for components --- yajsapi/executor/index.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/yajsapi/executor/index.ts b/yajsapi/executor/index.ts index 90f1956f5..57337cf6f 100755 --- a/yajsapi/executor/index.ts +++ b/yajsapi/executor/index.ts @@ -322,6 +322,7 @@ export class Executor implements ComputationHistory { break; } } + logger.debug("Stopped processing invoices."); } async function accept_payment_for_agreement({ @@ -369,6 +370,7 @@ export class Executor implements ComputationHistory { break; } } + logger.debug("Stopped processing debit notes."); } async function find_offers(): Promise { @@ -492,6 +494,7 @@ export class Executor implements ComputationHistory { } } }); + logger.debug("Stopped checking and scoring new offers."); } let storage_manager = await this._stack.enter_async_context( @@ -641,6 +644,7 @@ export class Executor implements ComputationHistory { ); } ); + logger.debug(`Stopped worker related to agreement ${agreement.id()}.`); } async function worker_starter(): Promise { @@ -709,6 +713,7 @@ export class Executor implements ComputationHistory { } } } + logger.debug("Stopped starting new tasks on providers."); } async function promise_timeout(seconds: number) { From b718fe3fd3132f4d4817fb3b0dea3a5d011a6b2d Mon Sep 17 00:00:00 2001 From: Filip Date: Tue, 16 Feb 2021 23:13:59 +0100 Subject: [PATCH 7/9] Fix shutdown (fix paymentCancellationToken usage, close work_queue) --- yajsapi/executor/index.ts | 13 ++++++++----- yajsapi/executor/smartq.ts | 4 +++- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/yajsapi/executor/index.ts b/yajsapi/executor/index.ts index 57337cf6f..7d7959bf6 100755 --- a/yajsapi/executor/index.ts +++ b/yajsapi/executor/index.ts @@ -322,6 +322,9 @@ export class Executor implements ComputationHistory { break; } } + if (!paymentCancellationToken.cancelled) { + paymentCancellationToken.cancel(); + } logger.debug("Stopped processing invoices."); } @@ -351,7 +354,7 @@ export class Executor implements ComputationHistory { /* TODO Consider processing invoices and debit notes together */ async function process_debit_notes(): Promise { for await (let debit_note of self._payment_api.incoming_debit_notes( - cancellationToken + paymentCancellationToken )) { if (agreements_to_pay.has(debit_note.agreementId)) { emit(new events.DebitNoteReceived({ @@ -741,7 +744,7 @@ export class Executor implements ComputationHistory { ]; try { while (services.indexOf(wait_until_done) > -1 || !done_queue.empty()) { - if (cancellationToken.cancelled) { done_queue.close(); } + if (cancellationToken.cancelled) { work_queue.close(); done_queue.close(); break; } const now = dayjs.utc(); if (now > this._expires) { throw new TimeoutError( @@ -812,12 +815,12 @@ export class Executor implements ComputationHistory { logger.error(error); } await bluebird.Promise.any([ - bluebird.Promise.all([find_offers_task, process_invoices_job]), - promise_timeout(10), + bluebird.Promise.all([process_invoices_job, debit_notes_job]), + promise_timeout(20), ]); emit(new events.CheckingPayments()); if (agreements_to_pay.size > 0) { - await bluebird.Promise.any([process_invoices_job, promise_timeout(15)]); + await bluebird.Promise.any([process_invoices_job, debit_notes_job, promise_timeout(15)]); emit(new events.CheckingPayments()); } } diff --git a/yajsapi/executor/smartq.ts b/yajsapi/executor/smartq.ts index dc2b44382..667c2a730 100644 --- a/yajsapi/executor/smartq.ts +++ b/yajsapi/executor/smartq.ts @@ -51,9 +51,11 @@ export class SmartQueue { if (this._items) this._items.length = 0; this._rescheduled_items.clear(); this._in_progress.clear(); + this.__done = true; + csp.putAsync(this.__eof, true); + csp.putAsync(this.__new_items, true); this.__new_items.close(); this.__eof.close(); - this.__done = true; } new_consumer(): Consumer { From 48a101776bed17c9856b9418193de626a87a485d Mon Sep 17 00:00:00 2001 From: Filip Date: Wed, 17 Feb 2021 00:32:17 +0100 Subject: [PATCH 8/9] Shutdown message --- yajsapi/executor/index.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/yajsapi/executor/index.ts b/yajsapi/executor/index.ts index 7d7959bf6..2e5c884e9 100755 --- a/yajsapi/executor/index.ts +++ b/yajsapi/executor/index.ts @@ -828,7 +828,10 @@ export class Executor implements ComputationHistory { self._payment_cancellation_token.cancel(); emit(new events.PaymentsFinished()); await sleep(2); + logger.info("Shutting down..."); cancellationToken.cancel(); + await sleep(15); + logger.info("Shutdown complete."); return; } From d781765fccfa033da09041fe01f852ed51b0d8ed Mon Sep 17 00:00:00 2001 From: Filip Date: Wed, 17 Feb 2021 00:36:02 +0100 Subject: [PATCH 9/9] Improve message --- yajsapi/rest/market.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yajsapi/rest/market.ts b/yajsapi/rest/market.ts index 8e74ce6ae..711ee515d 100755 --- a/yajsapi/rest/market.ts +++ b/yajsapi/rest/market.ts @@ -78,7 +78,7 @@ export class Agreement { async terminate(reason: string = "Finished"): Promise { try { await this._api.terminateAgreement(this._id, { message: reason }, { timeout: 5000 }); - logger.debug(`terminateAgreement(${this._id}) returned successfully`); + logger.debug(`Terminated agreement ${this._id}.`); return true; } catch (error) { if (error.response.status === 410) {