Skip to content

Commit

Permalink
fix consumer flow with asyncWith wrapper, fix logs
Browse files Browse the repository at this point in the history
  • Loading branch information
mdtanrikulu committed Nov 18, 2020
1 parent fcf099d commit 5c3f266
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 98 deletions.
7 changes: 6 additions & 1 deletion yajsapi/rest/activity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,13 @@ class Activity {
}
} catch (error) {
logger.error(`Failed to destroy activity: ${this._id}`);
} finally {
try {
await this._api.destroyActivity(this._id);
} catch (error) {
//suppress api error
}
}
await this._api.destroyActivity(this._id);
}
}

Expand Down
14 changes: 12 additions & 2 deletions yajsapi/runner/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ export class NoProposalsConfirmed extends YaEvent {

constructor({ num_offers, timeout }) {
super();
if (num_offers) this.num_offers = num_offers;
if (timeout) this.timeout = timeout;
this.num_offers = num_offers;
this.timeout = timeout;
}
}

Expand Down Expand Up @@ -277,10 +277,20 @@ export class ScriptFinished extends ScriptEvent {

export class TaskAccepted extends TaskEvent {
result?: any;
constructor({ task_id, result }) {
super();
if (task_id) this.task_id = task_id;
if (result) this.result = result;
}
}

export class TaskRejected extends TaskEvent {
reason?: string | null;
constructor({ task_id, reason }) {
super();
if (task_id) this.task_id = task_id;
if (reason) this.reason = reason;
}
}

export class DownloadStarted extends YaEvent {
Expand Down
166 changes: 87 additions & 79 deletions yajsapi/runner/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ export class Engine {
builder.ensure(constraint);
}
for (let x of multi_payment_decoration.properties) {
builder._props[x.key] = x.value;
builder._props[x.key] = x.value;
}
await this._demand_decor.decorate_demand(builder);
await this._strategy.decorate_demand(builder);
Expand Down Expand Up @@ -455,7 +455,7 @@ export class Engine {

let _act;
try {
_act = await activity_api.create_activity(agreement, secure)
_act = await activity_api.create_activity(agreement, secure);
} catch (error) {
emit(new events.ActivityCreateFailed({ agr_id: agreement.id() }));
throw error;
Expand Down Expand Up @@ -484,98 +484,104 @@ export class Engine {
storage_manager,
emit
);
let consumer = work_queue.new_consumer();

let command_generator = worker(work_context, task_emitter(consumer));
for await (let batch of command_generator) {
try {
let current_worker_task = consumer.last_item();
if (current_worker_task) {
await asyncWith(work_queue.new_consumer(), async (consumer) => {
let command_generator = worker(
work_context,
task_emitter(consumer)
);
for await (let batch of command_generator) {
try {
let current_worker_task = consumer.last_item();
if (current_worker_task) {
emit(
new events.TaskStarted({
agr_id: agreement.id(),
task_id: current_worker_task.id,
task_data: current_worker_task.data(),
})
);
}
let task_id = current_worker_task
? current_worker_task.id
: null;
batch.attestation = {
credentials: act.credentials,
nonce: act.id,
exeunitHashes: act.exeunitHashes,
};
await batch.prepare();
let cc = new CommandContainer();
batch.register(cc);
let remote = await act.exec(cc.commands());
emit(
new events.TaskStarted({
new events.ScriptSent({
agr_id: agreement.id(),
task_id: current_worker_task.id,
task_data: current_worker_task.data(),
task_id: task_id,
cmds: cc.commands(),
})
);
}
let task_id = current_worker_task ? current_worker_task.id : null;
batch.attestation = {
credentials: act.credentials,
nonce: act.id,
exeunitHashes: act.exeunitHashes
};
await batch.prepare();
let cc = new CommandContainer();
batch.register(cc);
let remote = await act.exec(cc.commands());
emit(
new events.ScriptSent({
agr_id: agreement.id(),
task_id: task_id,
cmds: cc.commands(),
})
);
try {
for await (let step of remote) {
batch.output.push(step);
try {
for await (let step of remote) {
batch.output.push(step);
emit(
new events.CommandExecuted({
success: true,
agr_id: agreement.id(),
task_id: task_id,
command: cc.commands()[step.idx],
message: step.message,
cmd_idx: step.idx,
})
);
}
} catch (error) {
// assert len(err.args) >= 2
const { name: cmd_idx , description } = error;
//throws new CommandExecutionError from activity#355
emit(
new events.CommandExecuted({
success: true,
success: false,
agr_id: agreement.id(),
task_id: task_id,
command: cc.commands()[step.idx],
message: step.message,
cmd_idx: step.idx,
command: cc.commands()[cmd_idx],
message: description,
cmd_idx: cmd_idx,
})
);
throw error;
}
} catch (error) {
// assert len(err.args) >= 2
const [cmd_msg, cmd_idx] = error;
emit(
new events.CommandExecuted({
success: false,
new events.GettingResults({
agr_id: agreement.id(),
task_id: task_id,
command: cc.commands()[cmd_idx],
message: cmd_msg,
cmd_idx: cmd_idx,
})
);
throw error;
}
emit(
new events.GettingResults({
agr_id: agreement.id(),
task_id: task_id,
})
);
await batch.post();
emit(
new events.ScriptFinished({
agr_id: agreement.id(),
task_id: task_id,
})
);
await accept_payment_for_agreement({
agreement_id: agreement.id(),
partial: true,
});
} catch (error) {
try {
// await command_generator.athrow(*sys.exc_info())
} catch (error) {
await batch.post();
emit(
new events.WorkerFinished({
new events.ScriptFinished({
agr_id: agreement.id(),
exception: [error],
task_id: task_id,
})
);
return;
await accept_payment_for_agreement({
agreement_id: agreement.id(),
partial: true,
});
} catch (error) {
try {
await command_generator.throw(error)
} catch (error) {
emit(
new events.WorkerFinished({
agr_id: agreement.id(),
exception: [error],
})
);
return;
}
}
}
}
});
await accept_payment_for_agreement({
agreement_id: agreement.id(),
partial: false,
Expand Down Expand Up @@ -760,19 +766,21 @@ export class Engine {
}

_get_common_payment_platforms(proposal: OfferProposal): string[] {
let prov_platforms = Object.keys(proposal.props()).filter((prop) => {
return prop.startsWith("golem.com.payment.platform.")
}).map((prop) => {
return prop.split(".")[4];
});
let prov_platforms = Object.keys(proposal.props())
.filter((prop) => {
return prop.startsWith("golem.com.payment.platform.");
})
.map((prop) => {
return prop.split(".")[4];
});
if (!prov_platforms) {
prov_platforms = ["NGNT"];
}
const req_platforms = this._budget_allocations.map(
(budget_allocation) => budget_allocation.payment_platform
);
return req_platforms.filter((value) =>
value && prov_platforms.includes(value)
return req_platforms.filter(
(value) => value && prov_platforms.includes(value)
) as string[];
}

Expand Down
6 changes: 3 additions & 3 deletions yajsapi/runner/smartq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,12 @@ export class Consumer<Item> {
this._fetched = null;
}

__enter__(): Consumer<Item> {
ready(): Consumer<Item> {
return this;
}

__exit__() {
eventLoop().create_task(this._queue.reschedule_all(this));
done() {
eventLoop().create_task(this._queue.reschedule_all.bind(this._queue, this));
return null;
}

Expand Down
14 changes: 10 additions & 4 deletions yajsapi/runner/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,18 +91,24 @@ export class Task<TaskData, TaskResult> {

accept_task(result: TaskResult | null = null): void {
if (this._emit_event) {
this._emit_event("task", "accept", null, result);
this._emit_event(new events.TaskAccepted({task_id: this.id, result}));
}
if (this._status != TaskStatus.RUNNING) throw "Accepted task not running";
this._status = TaskStatus.ACCEPTED;
this._result = result;
this._stop();
for (let cb of this._callbacks) cb && cb(this, "accept");
for (let cb of this._callbacks) cb && cb(this, TaskStatus.ACCEPTED);
}

reject_task(): void {
if (this._status != TaskStatus.RUNNING) throw "";
reject_task(reason: string | null = null, retry: boolean = false): void {
if (this._emit_event) {
this._emit_event(new events.TaskRejected({task_id: this.id, reason}));
}
if (this._status != TaskStatus.RUNNING) throw "Rejected task not running";
this._status = TaskStatus.REJECTED;
this._stop(retry)

for (let cb of this._callbacks) cb && cb(self, TaskStatus.REJECTED)
}

static get counter(): number {
Expand Down
4 changes: 2 additions & 2 deletions yajsapi/utils/asyncWith.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { logger } from "./";
export default async function asyncWith(expression, block) {
let mgr = expression ? await expression.ready() : null;
let mgr = expression ? await expression.ready.call(expression) : null;
try {
await block(mgr);
} catch (error) {
const { message, stack } = error;
console.log();
logger.error(`${message}\n\n${stack}\n`);
}
await expression.done(mgr);
await expression.done.call(expression, mgr);
}
13 changes: 6 additions & 7 deletions yajsapi/utils/log.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,7 @@ class SummaryLogger {
else
msg = `${
event["num_offers"]
} offers have been collected from the market, but
no provider has responded for ${this.time_waiting_for_proposals.asSeconds()}s. `;
} offers have been collected from the market, but no provider has responded for ${this.time_waiting_for_proposals.asSeconds()}s. `;
msg +=
"Make sure you're using the latest released versions of yagna and yajsapi, and the correct subnet.";
logger.warn(msg);
Expand Down Expand Up @@ -223,7 +222,7 @@ class SummaryLogger {
if (event["success"]) return;
const provider_name = this.agreement_provider_name[event["agr_id"]];
logger.warn(
`Command failed on provider '${provider_name}', command: ${event["command"]}, output: ${event["message"]}`
`Command failed on provider '${provider_name}', command: ${JSON.stringify(event["command"])}, output: ${event["message"]}`
);
} else if (eventName === events.ScriptFinished.name) {
const provider_name = this.agreement_provider_name[event["agr_id"]];
Expand Down Expand Up @@ -300,9 +299,9 @@ function isSuperset(set: Set<any>, subset: Set<any>) {
return true;
}

export const changeLogLevel = ((level: string) => {
options.level = level;
logger.configure(options);
});
export const changeLogLevel = (level: string) => {
options.level = level;
logger.configure(options);
};

export default logger;

0 comments on commit 5c3f266

Please sign in to comment.