Skip to content

Commit

Permalink
Merge pull request #128 from golemfactory/b0.3-fixes
Browse files Browse the repository at this point in the history
B0.3 fixes
  • Loading branch information
shadeofblue authored Feb 18, 2021
2 parents 289a25a + d781765 commit b910198
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 37 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "yajsapi",
"version": "0.3.0",
"version": "0.3.1-alpha.1",
"description": "NodeJS API for Next Golem",
"repository": "https://github.com/golemfactory/yajsapi",
"main": "dist/index.js",
Expand Down
54 changes: 43 additions & 11 deletions yajsapi/executor/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,13 @@ export const sgx = _sgx;
export const vm = _vm;
import { Task, TaskStatus } from "./task";
import { Consumer, SmartQueue } from "./smartq";
import { LeastExpensiveLinearPayuMS, MarketStrategy, SCORE_NEUTRAL } from "./strategy";
import {
ComputationHistory,
DecreaseScoreForUnconfirmedAgreement,
LeastExpensiveLinearPayuMS,
MarketStrategy,
SCORE_NEUTRAL
} from "./strategy";
import { Package } from "../package";

export { Task, TaskStatus };
Expand Down Expand Up @@ -116,7 +122,7 @@ export type ExecutorOpts = {
*
* @description Used to run tasks using the specified application package within providers' execution units.
*/
export class Executor {
export class Executor implements ComputationHistory {
private _subnet;
private _driver;
private _network;
Expand All @@ -129,6 +135,7 @@ export class Executor {
private _expires;
private _budget_amount;
private _budget_allocations: Allocation[];
private _rejecting_providers: Set<string>;

private _activity_api;
private _market_api;
Expand All @@ -137,6 +144,7 @@ export class Executor {
private _wrapped_consumer;
private _cancellation_token: CancellationToken;
private _worker_cancellation_token: CancellationToken;
private _payment_cancellation_token: CancellationToken;

/**
* Create new executor
Expand All @@ -156,7 +164,7 @@ export class Executor {
max_workers = 5,
timeout = dayjs.duration({ minutes: 5 }).asMilliseconds(),
budget,
strategy = new LeastExpensiveLinearPayuMS(),
strategy,
subnet_tag,
driver,
network,
Expand All @@ -166,21 +174,25 @@ export class Executor {
this._driver = driver ? driver.toLowerCase() : DEFAULT_DRIVER;
this._network = network ? network.toLowerCase() : DEFAULT_NETWORK;
this._stream_output = false;
this._strategy = strategy;
this._strategy =
strategy || new DecreaseScoreForUnconfirmedAgreement(new LeastExpensiveLinearPayuMS(), 0.5);
this._api_config = new rest.Configuration();
this._stack = new AsyncExitStack();
this._task_package = task_package;
this._conf = new _ExecutorConfig(max_workers, timeout);
// TODO: setup precision
this._budget_amount = parseFloat(budget);
this._budget_allocations = [];
this._rejecting_providers = new Set();

this._cancellation_token = new CancellationToken();
let cancellationToken = this._cancellation_token;

this._worker_cancellation_token = new CancellationToken();
let workerCancellationToken = this._worker_cancellation_token;

this._payment_cancellation_token = new CancellationToken();

function cancel(event) {
if (cancellationToken && !cancellationToken.cancelled) {
cancellationToken.cancel();
Expand Down Expand Up @@ -247,6 +259,7 @@ export class Executor {
let activity_api = this._activity_api;
let strategy = this._strategy;
let cancellationToken = this._cancellation_token;
let paymentCancellationToken = this._payment_cancellation_token;
let done_queue: Queue<Task<D, R>> = new Queue([]);
let stream_output = this._stream_output;

Expand Down Expand Up @@ -277,7 +290,7 @@ export class Executor {

async function process_invoices(): Promise<void> {
for await (let invoice of self._payment_api.incoming_invoices(
cancellationToken
paymentCancellationToken
)) {
if (agreements_to_pay.has(invoice.agreementId)) {
emit(
Expand Down Expand Up @@ -309,6 +322,10 @@ export class Executor {
break;
}
}
if (!paymentCancellationToken.cancelled) {
paymentCancellationToken.cancel();
}
logger.debug("Stopped processing invoices.");
}

async function accept_payment_for_agreement({
Expand Down Expand Up @@ -337,7 +354,7 @@ export class Executor {
/* TODO Consider processing invoices and debit notes together */
async function process_debit_notes(): Promise<void> {
for await (let debit_note of self._payment_api.incoming_debit_notes(
cancellationToken
paymentCancellationToken
)) {
if (agreements_to_pay.has(debit_note.agreementId)) {
emit(new events.DebitNoteReceived({
Expand All @@ -356,6 +373,7 @@ export class Executor {
break;
}
}
logger.debug("Stopped processing debit notes.");
}

async function find_offers(): Promise<void> {
Expand Down Expand Up @@ -389,7 +407,7 @@ export class Executor {
offers_collected += 1;
let score;
try {
score = await strategy.score_offer(proposal);
score = await strategy.score_offer(proposal, self);
logger.debug(`Scored offer ${proposal.id()}, ` +
`provider: ${proposal.props()["golem.node.id.name"]}, ` +
`strategy: ${strategy.constructor.name}, ` +
Expand Down Expand Up @@ -479,6 +497,7 @@ export class Executor {
}
}
});
logger.debug("Stopped checking and scoring new offers.");
}

let storage_manager = await this._stack.enter_async_context(
Expand Down Expand Up @@ -628,6 +647,7 @@ export class Executor {
);
}
);
logger.debug(`Stopped worker related to agreement ${agreement.id()}.`);
}

async function worker_starter(): Promise<void> {
Expand Down Expand Up @@ -677,8 +697,10 @@ export class Executor {
if (self._worker_cancellation_token.cancelled) { break; }
if (!(await agreement.confirm())) {
emit(new events.AgreementRejected({ agr_id: agreement.id() }));
self._rejecting_providers.add(provider_id);
continue;
}
self._rejecting_providers.delete(provider_id);
emit(new events.AgreementConfirmed({ agr_id: agreement.id() }));
if (self._worker_cancellation_token.cancelled) { break; }
new_task = loop.create_task(_start_worker.bind(null, agreement));
Expand All @@ -694,6 +716,7 @@ export class Executor {
}
}
}
logger.debug("Stopped starting new tasks on providers.");
}

async function promise_timeout(seconds: number) {
Expand Down Expand Up @@ -721,7 +744,7 @@ export class Executor {
];
try {
while (services.indexOf(wait_until_done) > -1 || !done_queue.empty()) {
if (cancellationToken.cancelled) { done_queue.close(); }
if (cancellationToken.cancelled) { work_queue.close(); done_queue.close(); break; }
const now = dayjs.utc();
if (now > this._expires) {
throw new TimeoutError(
Expand Down Expand Up @@ -792,18 +815,23 @@ export class Executor {
logger.error(error);
}
await bluebird.Promise.any([
bluebird.Promise.all([find_offers_task, process_invoices_job]),
promise_timeout(10),
bluebird.Promise.all([process_invoices_job, debit_notes_job]),
promise_timeout(20),
]);
emit(new events.CheckingPayments());
if (agreements_to_pay.size > 0) {
await bluebird.Promise.any([process_invoices_job, promise_timeout(15)]);
await bluebird.Promise.any([process_invoices_job, debit_notes_job, promise_timeout(15)]);
emit(new events.CheckingPayments());
}
}
if (!self._payment_cancellation_token.cancelled)
self._payment_cancellation_token.cancel();
emit(new events.PaymentsFinished());
await sleep(2);
logger.info("Shutting down...");
cancellationToken.cancel();
await sleep(15);
logger.info("Shutdown complete.");
return;
}

Expand Down Expand Up @@ -873,6 +901,10 @@ export class Executor {
}
}

rejected_last_agreement(provider_id: string): boolean {
return this._rejecting_providers.has(provider_id);
}

async ready(): Promise<Executor> {
let stack = this._stack;
// TODO: Cleanup on exception here.
Expand Down
4 changes: 3 additions & 1 deletion yajsapi/executor/smartq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,11 @@ export class SmartQueue<Item> {
if (this._items) this._items.length = 0;
this._rescheduled_items.clear();
this._in_progress.clear();
this.__done = true;
csp.putAsync(this.__eof, true);
csp.putAsync(this.__new_items, true);
this.__new_items.close();
this.__eof.close();
this.__done = true;
}

new_consumer(): Consumer<Item> {
Expand Down
39 changes: 36 additions & 3 deletions yajsapi/executor/strategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ export const CFF_DEFAULT_PRICE_FOR_COUNTER: Map<Counter, number> = new Map([
[Counter.CPU, parseFloat("0.002") * 10],
]);

export interface ComputationHistory {
rejected_last_agreement: (string) => boolean;
}

export class MarketStrategy {
/*Abstract market strategy*/

async decorate_demand(demand: DemandBuilder): Promise<void> {}

async score_offer(offer: OfferProposal): Promise<Number> {
async score_offer(offer: OfferProposal, history?: ComputationHistory): Promise<Number> {
return SCORE_REJECTED;
}
}
Expand All @@ -44,7 +48,7 @@ export class DummyMS extends MarketGeneral {
this._activity = new Activity().from_properties(demand._properties);
}

async score_offer(offer: OfferProposal): Promise<Number> {
async score_offer(offer: OfferProposal, history?: ComputationHistory): Promise<Number> {
const linear: ComLinear = new ComLinear().from_properties(offer.props());

if (linear.scheme.value !== BillingScheme.PAYU) {
Expand Down Expand Up @@ -73,7 +77,7 @@ export class LeastExpensiveLinearPayuMS {
demand.ensure(`(${PRICE_MODEL}=${PriceModel.LINEAR})`);
}

async score_offer(offer: OfferProposal): Promise<Number> {
async score_offer(offer: OfferProposal, history?: ComputationHistory): Promise<Number> {
const linear: ComLinear = new ComLinear().from_properties(offer.props());

logger.debug(`Scoring offer ${offer.id()}, parameters: ${JSON.stringify(linear)}`);
Expand Down Expand Up @@ -111,3 +115,32 @@ export class LeastExpensiveLinearPayuMS {
return score;
}
}

export class DecreaseScoreForUnconfirmedAgreement {
/* A market strategy that modifies a base strategy based on history of agreements. */

private _base_strategy;
private _factor;

constructor(base_strategy, factor) {
this._base_strategy = base_strategy;
this._factor = factor;
}

async decorate_demand(demand: DemandBuilder): Promise<void> {
/* Decorate `demand` using the base strategy. */
await this._base_strategy.decorate_demand(demand);
}

async score_offer(offer: OfferProposal, history?: ComputationHistory): Promise<Number> {
/* Score `offer` using the base strategy and apply penalty if needed.
If the offer issuer failed to approve the previous agreement (if any)
then the base score is multiplied by `this._factor`. */
let score = await this._base_strategy.score_offer(offer);
if (history && history.rejected_last_agreement(offer.issuer()) && score > 0) {
score *= this._factor;
logger.debug(`Decreasing score for offer ${offer.id()} from '${offer.issuer()}'`);
}
return score;
}
}
5 changes: 4 additions & 1 deletion yajsapi/rest/activity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -426,12 +426,15 @@ class PollingBatch extends Batch {
this.activity_id,
this.batch_id,
undefined,
{ params: { timeout: 5 } }
{ timeout: 5000 }
);
results = data;
} catch (error) {
const timeout_msg = error.message && error.message.includes("timeout");
if (error.response && error.response.status === 408) {
continue;
} else if (error.code === "ETIMEDOUT" || (error.code === "ECONNABORTED" && timeout_msg)) {
continue;
} else {
if (error.response && error.response.status == 500 && error.response.data) {
throw new CommandExecutionError(
Expand Down
4 changes: 2 additions & 2 deletions yajsapi/rest/market.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export class Agreement {
async confirm(): Promise<boolean> {
await this._api.confirmAgreement(this._id);
try {
let { data: msg } = await this._api.waitForApproval(this._id, 90, { timeout: 100000 });
let { data: msg } = await this._api.waitForApproval(this._id, 15, { timeout: 16000 });
return true;
} catch (error) {
logger.debug(`waitForApproval(${this._id}) raised ApiException ${error}`);
Expand All @@ -78,7 +78,7 @@ export class Agreement {
async terminate(reason: string = "Finished"): Promise<boolean> {
try {
await this._api.terminateAgreement(this._id, { message: reason }, { timeout: 5000 });
logger.debug(`terminateAgreement(${this._id}) returned successfully`);
logger.debug(`Terminated agreement ${this._id}.`);
return true;
} catch (error) {
if (error.response.status === 410) {
Expand Down
3 changes: 2 additions & 1 deletion yajsapi/rest/payment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ class _AllocationTask extends ResourceCtx<Allocation> {
_allocation.expires = new Date(parseInt(model.timeout) * 1000);
return _allocation;
} catch (error) {
logger.error(error);
const msg = error.response && error.response.data ? error.response.data.message : error.message;
logger.error(`Payment allocation error (message: ${msg}). Please run "yagna payment status" to check your account.`);
throw new Error(error);
}
}
Expand Down
28 changes: 11 additions & 17 deletions yajsapi/utils/log.ts
Original file line number Diff line number Diff line change
Expand Up @@ -231,18 +231,13 @@ class SummaryLogger {
);
} else if (eventName === events.CommandExecuted.name) {
const provider_name = this.agreement_provider_info[event["agr_id"]].name;
const cmd = JSON.stringify(event["command"]);
if (event["success"]) {
logger.debug(
`Command successful on provider '${provider_name}', command: ${JSON.stringify(
event["command"]
)}.`
);
logger.debug(`Command successful on provider '${provider_name}', command: ${cmd}.`);
logger.silly(`Command ${cmd}: stdout: ${event["stdout"]}, stderr: ${event["stderr"]}, msg: ${event["message"]}.`);
} else {
logger.warn(
`Command failed on provider '${provider_name}', command: ${JSON.stringify(
event["command"]
)}, output: ${event["message"]}`
);
logger.warn(`Command failed on provider '${provider_name}', command: ${cmd}, msg: ${event["message"]}`);
logger.debug(`Command ${cmd}: stdout: ${event["stdout"]}, stderr: ${event["stderr"]}.`);
}
} else if (eventName === events.ScriptFinished.name) {
const provider_info = this.agreement_provider_info[event["agr_id"]];
Expand Down Expand Up @@ -353,22 +348,21 @@ export function logSummary(

export const changeLogLevel = (level: string) => {
options.level = level;
options.transports.push(
options.transports = [
new winston.transports.Console({ level: level }),
new winston.transports.File({
filename: path.join(
"logs",
`yajsapi-${dayjs().format("YYYY-MM-DD_HH-mm-ss")}.log`
),
level: "debug",
}) as any
);
options.transports.push(
level: "silly",
}) as any,
new winston.transports.File({
filename: path.join("logs", "yajsapi-current.log"),
level: "debug",
level: "silly",
options: { flags: "w" },
}) as any
);
];
logger.configure(options);
};

Expand Down

0 comments on commit b910198

Please sign in to comment.