Skip to content

Commit

Permalink
Repeat API calls on timeouts (#181)
Browse files Browse the repository at this point in the history
* Add repeat_on_error and suppress_exceptions functions
* Use suppress_exceptions function in incoming_invoices and incoming_debit_notes
* Use suppress_exceptions in Subscription.events()
* Use repeat_on_error() in Invoice.accept, DebitNote.accept, Payment.invoice, Payment.debit_note, Allocation.details and Allocation.delete
  • Loading branch information
filipgolem authored May 20, 2021
1 parent 609028e commit 25a3fe5
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 62 deletions.
2 changes: 1 addition & 1 deletion yajsapi/executor/ctx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ export class WorkContext {
*
* @returns Work object (the latter contains sequence commands added before calling this method)
*/
commit({ timeout }: { timeout?: number }): Work {
commit({ timeout }: { timeout?: number } = {}): Work {
let steps = this._pending_steps;
this._pending_steps = [];
return new _Steps(steps, timeout);
Expand Down
2 changes: 1 addition & 1 deletion yajsapi/rest/activity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ class Activity {
try {
await this._api.destroyActivity(this._id, { timeout: 11000, params: { timeout: 10 } });
} catch (error) {
logger.error(`Got API Exception when destroying activity ${this._id}: ${error}`);
logger.warn(`Got API Exception when destroying activity ${this._id}: ${error}`);
}
}
}
Expand Down
56 changes: 56 additions & 0 deletions yajsapi/rest/common.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import { Callable, logger, sleep } from "../utils";

export function is_intermittent_error(e) {
if (e.response && (e.response.status === 408 || e.response.status === 504)) { return true; }
if (e.code === "ECONNABORTED" && e.message && e.message.includes("timeout")) { return true; }
if (e.code === "ETIMEDOUT") { return true; }
if (e.code === "EPIPE") { return true; }
return false;
}

export async function suppress_exceptions(
condition: Callable<Error, boolean>,
block: Callable<void, any>,
report_exceptions: boolean = true
): Promise<any> {
try {
return await block();
} catch (error) {
if (condition(error)) {
logger.debug(`Exception suppressed: ${error}`);
} else {
throw error;
}
}
}

export async function repeat_on_error(
block: Callable<void, any>,
max_tries: number = 5,
max_duration_ms = 15000,
interval: number = 0.0,
condition: Callable<Error, boolean> = is_intermittent_error
) {
let start_time = Date.now();
for (let try_num = 1; try_num <= max_tries; ++try_num) {
if (try_num > 1) {
await sleep(Math.min(interval, start_time + max_duration_ms - Date.now()));
}
let err_in_block, ret_value;
await suppress_exceptions(condition, async () => {
try {
ret_value = await block();
} catch (error) {
err_in_block = error;
throw error;
}
});
if (err_in_block === undefined) { return ret_value; }
const duration = Date.now() - start_time;
const repeat = try_num < max_tries && duration < max_duration_ms;
const msg = `API call timed out (attempt ${try_num}/${max_tries}), ` +
(repeat ? `retrying in ${interval}s` : "giving up after ${duration}ms");
logger.debug(msg);
if (!repeat) { throw err_in_block; }
}
}
14 changes: 7 additions & 7 deletions yajsapi/rest/market.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { logger, sleep } from "../utils";
import { RequestorApi } from "ya-ts-client/dist/ya-market/api";
import * as models from "ya-ts-client/dist/ya-market/src/models";
import { Configuration } from "ya-ts-client/dist/ya-activity";
import { suppress_exceptions, is_intermittent_error } from "./common";

dayjs.extend(utc);

Expand Down Expand Up @@ -237,20 +238,19 @@ export class Subscription {
async *events(cancellationToken?): AsyncGenerator<OfferProposal> {
while (this._open) {
if (cancellationToken && cancellationToken.cancelled) break;
let proposals: any[] = [];
try {
let { data: proposals } = await this._api.collectOffers(
this._id,
3,
10,
{ timeout: 5000 }
);
await suppress_exceptions(is_intermittent_error, async () => {
let { data } = await this._api.collectOffers(this._id, 3, 10, { timeout: 5000 });
proposals = data;
});
for (let _proposal of proposals) {
if (cancellationToken && cancellationToken.cancelled) return;
if (_proposal.eventType === "ProposalEvent") {
yield new OfferProposal(this, _proposal as models.ProposalEvent);
}
}
if (!proposals || !proposals.length) {
if (!proposals.length) {
await sleep(2);
}
} catch (error) {
Expand Down
109 changes: 56 additions & 53 deletions yajsapi/rest/payment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import * as yap from "ya-ts-client/dist/ya-payment/src/models";
import { Configuration } from "ya-ts-client/dist/ya-activity";
import { RequestorApi } from "ya-ts-client/dist/ya-payment/api";
import { logger, sleep } from "../utils";
import { is_intermittent_error, repeat_on_error, suppress_exceptions } from "./common";

dayjs.extend(utc);

Expand Down Expand Up @@ -58,7 +59,9 @@ export class Invoice extends yInvoice {
};
acceptance!.totalAmountAccepted = amount.toString();
acceptance!.allocationId = allocation.id;
await this._api.acceptInvoice(this.invoiceId, acceptance!, undefined, { timeout: 15000 });
await repeat_on_error(async () => {
await this._api.acceptInvoice(this.invoiceId, acceptance!, undefined, { timeout: 7000 });
});
}
}

Expand All @@ -81,7 +84,9 @@ export class DebitNote extends yDebitNote {
};
acceptance!.totalAmountAccepted = amount.toString();
acceptance!.allocationId = allocation.id;
await this._api.acceptDebitNote(this.debitNoteId, acceptance!, undefined, { timeout: 15000 });
await repeat_on_error(async () => {
await this._api.acceptDebitNote(this.debitNoteId, acceptance!, undefined, { timeout: 7000 });
});
}
}

Expand Down Expand Up @@ -113,27 +118,24 @@ export class Allocation extends _Link {
//"Allocation expiration timestamp"

async details(): Promise<AllocationDetails> {
let allocationDetails = new AllocationDetails();

try {
return await repeat_on_error(async () => {
let allocationDetails = new AllocationDetails();
let {
data: details,
}: { data: yap.Allocation } = await this._api.getAllocation(this.id, { timeout: 15000 });
}: { data: yap.Allocation } = await this._api.getAllocation(this.id, { timeout: 7000 });
allocationDetails.spent_amount = parseFloat(details.spentAmount);
allocationDetails.remaining_amount = parseFloat(details.remainingAmount);
} catch (error) {
logger.error(error);
throw new Error(error);
}

return allocationDetails;
return allocationDetails;
});
}

async delete() {
try {
await this._api.releaseAllocation(this.id, { timeout: 15000 });
await repeat_on_error(async () => {
await this._api.releaseAllocation(this.id, { timeout: 7000 });
});
} catch(error) {
logger.error(`Release allocation: ${error}`);
logger.error(`Release allocation error: ${error}`);
}
}
}
Expand Down Expand Up @@ -291,7 +293,9 @@ export class Payment {
}

async debit_note(debit_note_id: string): Promise<DebitNote> {
let { data: debit_note_obj } = await this._api.getDebitNote(debit_note_id, { timeout: 5000 });
let debit_note_obj = await repeat_on_error(async () => {
return (await this._api.getDebitNote(debit_note_id, { timeout: 5000 })).data;
});
// TODO may need to check only requestor debit notes
return new DebitNote(this._api, debit_note_obj)
}
Expand All @@ -307,9 +311,10 @@ export class Payment {
}

async invoice(invoice_id: string): Promise<Invoice> {
let { data: invoice_obj } = await this._api.getInvoice(invoice_id, { timeout: 5000 });
let invoice_obj = await repeat_on_error(async () => {
return (await this._api.getInvoice(invoice_id, { timeout: 5000 })).data;
});
// TODO may need to check only requestor invoices
// logger.log("debug", `got=${JSON.stringify(invoice_obj)}`);
return new Invoice(this._api, invoice_obj);
}

Expand All @@ -322,28 +327,27 @@ export class Payment {
let ts = init_ts;
while (true) {
if (cancellationToken.cancelled) break;
try {
let { data: events } = await api.getInvoiceEvents(
let events: any[] = [];
await suppress_exceptions(is_intermittent_error, async () => {
let { data } = await api.getInvoiceEvents(
5,
ts.format("YYYY-MM-DDTHH:mm:ss.SSSSSSZ")
);
for (let ev of events) {
logger.debug(
`Received invoice event: ${JSON.stringify(
ev
)}, type: ${JSON.stringify(Object.getPrototypeOf(ev))}`
);
ts = dayjs(ev.eventDate);
if (ev.eventType === "InvoiceReceivedEvent") {
let invoice = await self.invoice(ev["invoiceId"]);
yield invoice;
}
}
if (!events || !events.length) {
await sleep(1);
events = data;
});
for (let ev of events) {
logger.debug(
`Received invoice event: ${JSON.stringify(ev)}, ` +
`type: ${JSON.stringify(Object.getPrototypeOf(ev))}`
);
ts = dayjs(ev.eventDate);
if (ev.eventType === "InvoiceReceivedEvent") {
let invoice = await self.invoice(ev["invoiceId"]);
yield invoice;
}
} catch (error) {
logger.error(`Received invoice error: ${error}`);
}
if (!events.length) {
await sleep(1);
}
}
return;
Expand All @@ -361,28 +365,27 @@ export class Payment {
let ts = init_ts;
while (true) {
if (cancellationToken.cancelled) break;
try {
let { data: events } = await api.getDebitNoteEvents(
let events: any[] = [];
await suppress_exceptions(is_intermittent_error, async () => {
let { data } = await api.getDebitNoteEvents(
5,
ts.format("YYYY-MM-DDTHH:mm:ss.SSSSSSZ")
);
for (let ev of events) {
logger.debug(
`Received debit note event: ${JSON.stringify(
ev
)}, type: ${JSON.stringify(Object.getPrototypeOf(ev))}`
);
ts = dayjs(ev.eventDate);
if (ev.eventType === "DebitNoteReceivedEvent") {
let debit_note = await self.debit_note(ev["debitNoteId"]);
yield debit_note;
}
}
if (!events || !events.length) {
await sleep(1);
events = data;
});
for (let ev of events) {
logger.debug(
`Received debit note event: ${JSON.stringify(ev)}, ` +
`type: ${JSON.stringify(Object.getPrototypeOf(ev))}`
);
ts = dayjs(ev.eventDate);
if (ev.eventType === "DebitNoteReceivedEvent") {
let debit_note = await self.debit_note(ev["debitNoteId"]);
yield debit_note;
}
} catch (error) {
logger.error(`Received debit note error: ${error}`);
}
if (!events.length) {
await sleep(1);
}
}
return;
Expand Down

0 comments on commit 25a3fe5

Please sign in to comment.