From 5c3f2663cce1d50c354eeb43255fc385a0153547 Mon Sep 17 00:00:00 2001 From: Muhammed Tanrikulu Date: Wed, 18 Nov 2020 21:43:00 +0100 Subject: [PATCH] fix consumer flow with asyncWith wrapper, fix logs --- yajsapi/rest/activity.ts | 7 +- yajsapi/runner/events.ts | 14 +++- yajsapi/runner/index.ts | 166 +++++++++++++++++++------------------ yajsapi/runner/smartq.ts | 6 +- yajsapi/runner/task.ts | 14 +++- yajsapi/utils/asyncWith.ts | 4 +- yajsapi/utils/log.ts | 13 ++- 7 files changed, 126 insertions(+), 98 deletions(-) diff --git a/yajsapi/rest/activity.ts b/yajsapi/rest/activity.ts index 9c718a073..e3aa742ce 100755 --- a/yajsapi/rest/activity.ts +++ b/yajsapi/rest/activity.ts @@ -201,8 +201,13 @@ class Activity { } } catch (error) { logger.error(`Failed to destroy activity: ${this._id}`); + } finally { + try { + await this._api.destroyActivity(this._id); + } catch (error) { + //suppress api error + } } - await this._api.destroyActivity(this._id); } } diff --git a/yajsapi/runner/events.ts b/yajsapi/runner/events.ts index d67aaf424..ab9cd5c10 100644 --- a/yajsapi/runner/events.ts +++ b/yajsapi/runner/events.ts @@ -98,8 +98,8 @@ export class NoProposalsConfirmed extends YaEvent { constructor({ num_offers, timeout }) { super(); - if (num_offers) this.num_offers = num_offers; - if (timeout) this.timeout = timeout; + this.num_offers = num_offers; + this.timeout = timeout; } } @@ -277,10 +277,20 @@ export class ScriptFinished extends ScriptEvent { export class TaskAccepted extends TaskEvent { result?: any; + constructor({ task_id, result }) { + super(); + if (task_id) this.task_id = task_id; + if (result) this.result = result; + } } export class TaskRejected extends TaskEvent { reason?: string | null; + constructor({ task_id, reason }) { + super(); + if (task_id) this.task_id = task_id; + if (reason) this.reason = reason; + } } export class DownloadStarted extends YaEvent { diff --git a/yajsapi/runner/index.ts b/yajsapi/runner/index.ts index 12af4df20..47aa26e01 100755 --- a/yajsapi/runner/index.ts +++ b/yajsapi/runner/index.ts @@ -258,7 +258,7 @@ export class Engine { builder.ensure(constraint); } for (let x of multi_payment_decoration.properties) { - builder._props[x.key] = x.value; + builder._props[x.key] = x.value; } await this._demand_decor.decorate_demand(builder); await this._strategy.decorate_demand(builder); @@ -455,7 +455,7 @@ export class Engine { let _act; try { - _act = await activity_api.create_activity(agreement, secure) + _act = await activity_api.create_activity(agreement, secure); } catch (error) { emit(new events.ActivityCreateFailed({ agr_id: agreement.id() })); throw error; @@ -484,98 +484,104 @@ export class Engine { storage_manager, emit ); - let consumer = work_queue.new_consumer(); - - let command_generator = worker(work_context, task_emitter(consumer)); - for await (let batch of command_generator) { - try { - let current_worker_task = consumer.last_item(); - if (current_worker_task) { + await asyncWith(work_queue.new_consumer(), async (consumer) => { + let command_generator = worker( + work_context, + task_emitter(consumer) + ); + for await (let batch of command_generator) { + try { + let current_worker_task = consumer.last_item(); + if (current_worker_task) { + emit( + new events.TaskStarted({ + agr_id: agreement.id(), + task_id: current_worker_task.id, + task_data: current_worker_task.data(), + }) + ); + } + let task_id = current_worker_task + ? current_worker_task.id + : null; + batch.attestation = { + credentials: act.credentials, + nonce: act.id, + exeunitHashes: act.exeunitHashes, + }; + await batch.prepare(); + let cc = new CommandContainer(); + batch.register(cc); + let remote = await act.exec(cc.commands()); emit( - new events.TaskStarted({ + new events.ScriptSent({ agr_id: agreement.id(), - task_id: current_worker_task.id, - task_data: current_worker_task.data(), + task_id: task_id, + cmds: cc.commands(), }) ); - } - let task_id = current_worker_task ? current_worker_task.id : null; - batch.attestation = { - credentials: act.credentials, - nonce: act.id, - exeunitHashes: act.exeunitHashes - }; - await batch.prepare(); - let cc = new CommandContainer(); - batch.register(cc); - let remote = await act.exec(cc.commands()); - emit( - new events.ScriptSent({ - agr_id: agreement.id(), - task_id: task_id, - cmds: cc.commands(), - }) - ); - try { - for await (let step of remote) { - batch.output.push(step); + try { + for await (let step of remote) { + batch.output.push(step); + emit( + new events.CommandExecuted({ + success: true, + agr_id: agreement.id(), + task_id: task_id, + command: cc.commands()[step.idx], + message: step.message, + cmd_idx: step.idx, + }) + ); + } + } catch (error) { + // assert len(err.args) >= 2 + const { name: cmd_idx , description } = error; + //throws new CommandExecutionError from activity#355 emit( new events.CommandExecuted({ - success: true, + success: false, agr_id: agreement.id(), task_id: task_id, - command: cc.commands()[step.idx], - message: step.message, - cmd_idx: step.idx, + command: cc.commands()[cmd_idx], + message: description, + cmd_idx: cmd_idx, }) ); + throw error; } - } catch (error) { - // assert len(err.args) >= 2 - const [cmd_msg, cmd_idx] = error; emit( - new events.CommandExecuted({ - success: false, + new events.GettingResults({ agr_id: agreement.id(), task_id: task_id, - command: cc.commands()[cmd_idx], - message: cmd_msg, - cmd_idx: cmd_idx, }) ); - throw error; - } - emit( - new events.GettingResults({ - agr_id: agreement.id(), - task_id: task_id, - }) - ); - await batch.post(); - emit( - new events.ScriptFinished({ - agr_id: agreement.id(), - task_id: task_id, - }) - ); - await accept_payment_for_agreement({ - agreement_id: agreement.id(), - partial: true, - }); - } catch (error) { - try { - // await command_generator.athrow(*sys.exc_info()) - } catch (error) { + await batch.post(); emit( - new events.WorkerFinished({ + new events.ScriptFinished({ agr_id: agreement.id(), - exception: [error], + task_id: task_id, }) ); - return; + await accept_payment_for_agreement({ + agreement_id: agreement.id(), + partial: true, + }); + } catch (error) { + try { + await command_generator.throw(error) + } catch (error) { + emit( + new events.WorkerFinished({ + agr_id: agreement.id(), + exception: [error], + }) + ); + return; + } } } - } + }); await accept_payment_for_agreement({ agreement_id: agreement.id(), partial: false, @@ -760,19 +766,21 @@ export class Engine { } _get_common_payment_platforms(proposal: OfferProposal): string[] { - let prov_platforms = Object.keys(proposal.props()).filter((prop) => { - return prop.startsWith("golem.com.payment.platform.") - }).map((prop) => { - return prop.split(".")[4]; - }); + let prov_platforms = Object.keys(proposal.props()) + .filter((prop) => { + return prop.startsWith("golem.com.payment.platform."); + }) + .map((prop) => { + return prop.split(".")[4]; + }); if (!prov_platforms) { prov_platforms = ["NGNT"]; } const req_platforms = this._budget_allocations.map( (budget_allocation) => budget_allocation.payment_platform ); - return req_platforms.filter((value) => - value && prov_platforms.includes(value) + return req_platforms.filter( + (value) => value && prov_platforms.includes(value) ) as string[]; } diff --git a/yajsapi/runner/smartq.ts b/yajsapi/runner/smartq.ts index a087cad2a..b103f7312 100644 --- a/yajsapi/runner/smartq.ts +++ b/yajsapi/runner/smartq.ts @@ -153,12 +153,12 @@ export class Consumer { this._fetched = null; } - __enter__(): Consumer { + ready(): Consumer { return this; } - __exit__() { - eventLoop().create_task(this._queue.reschedule_all(this)); + done() { + eventLoop().create_task(this._queue.reschedule_all.bind(this._queue, this)); return null; } diff --git a/yajsapi/runner/task.ts b/yajsapi/runner/task.ts index 6e894ea51..415ea3d4f 100644 --- a/yajsapi/runner/task.ts +++ b/yajsapi/runner/task.ts @@ -91,18 +91,24 @@ export class Task { accept_task(result: TaskResult | null = null): void { if (this._emit_event) { - this._emit_event("task", "accept", null, result); + this._emit_event(new events.TaskAccepted({task_id: this.id, result})); } if (this._status != TaskStatus.RUNNING) throw "Accepted task not running"; this._status = TaskStatus.ACCEPTED; this._result = result; this._stop(); - for (let cb of this._callbacks) cb && cb(this, "accept"); + for (let cb of this._callbacks) cb && cb(this, TaskStatus.ACCEPTED); } - reject_task(): void { - if (this._status != TaskStatus.RUNNING) throw ""; + reject_task(reason: string | null = null, retry: boolean = false): void { + if (this._emit_event) { + this._emit_event(new events.TaskRejected({task_id: this.id, reason})); + } + if (this._status != TaskStatus.RUNNING) throw "Rejected task not running"; this._status = TaskStatus.REJECTED; + this._stop(retry) + + for (let cb of this._callbacks) cb && cb(self, TaskStatus.REJECTED) } static get counter(): number { diff --git a/yajsapi/utils/asyncWith.ts b/yajsapi/utils/asyncWith.ts index f3d93bddb..f23ff4718 100644 --- a/yajsapi/utils/asyncWith.ts +++ b/yajsapi/utils/asyncWith.ts @@ -1,6 +1,6 @@ import { logger } from "./"; export default async function asyncWith(expression, block) { - let mgr = expression ? await expression.ready() : null; + let mgr = expression ? await expression.ready.call(expression) : null; try { await block(mgr); } catch (error) { @@ -8,5 +8,5 @@ export default async function asyncWith(expression, block) { console.log(); logger.error(`${message}\n\n${stack}\n`); } - await expression.done(mgr); + await expression.done.call(expression, mgr); } diff --git a/yajsapi/utils/log.ts b/yajsapi/utils/log.ts index 79a93f632..e112dd890 100644 --- a/yajsapi/utils/log.ts +++ b/yajsapi/utils/log.ts @@ -188,8 +188,7 @@ class SummaryLogger { else msg = `${ event["num_offers"] - } offers have been collected from the market, but - no provider has responded for ${this.time_waiting_for_proposals.asSeconds()}s. `; + } offers have been collected from the market, but no provider has responded for ${this.time_waiting_for_proposals.asSeconds()}s. `; msg += "Make sure you're using the latest released versions of yagna and yajsapi, and the correct subnet."; logger.warn(msg); @@ -223,7 +222,7 @@ class SummaryLogger { if (event["success"]) return; const provider_name = this.agreement_provider_name[event["agr_id"]]; logger.warn( - `Command failed on provider '${provider_name}', command: ${event["command"]}, output: ${event["message"]}` + `Command failed on provider '${provider_name}', command: ${JSON.stringify(event["command"])}, output: ${event["message"]}` ); } else if (eventName === events.ScriptFinished.name) { const provider_name = this.agreement_provider_name[event["agr_id"]]; @@ -300,9 +299,9 @@ function isSuperset(set: Set, subset: Set) { return true; } -export const changeLogLevel = ((level: string) => { - options.level = level; - logger.configure(options); -}); +export const changeLogLevel = (level: string) => { + options.level = level; + logger.configure(options); +}; export default logger;