From 25a3fe511f35f3dde67764cfdf53cc1f753272f4 Mon Sep 17 00:00:00 2001 From: filipgolem <44880692+filipgolem@users.noreply.github.com> Date: Thu, 20 May 2021 10:37:24 +0200 Subject: [PATCH] Repeat API calls on timeouts (#181) * Add repeat_on_error and suppress_exceptions functions * Use suppress_exceptions function in incoming_invoices and incoming_debit_notes * Use suppress_exceptions in Subscription.events() * Use repeat_on_error() in Invoice.accept, DebitNote.accept, Payment.invoice, Payment.debit_note, Allocation.details and Allocation.delete --- yajsapi/executor/ctx.ts | 2 +- yajsapi/rest/activity.ts | 2 +- yajsapi/rest/common.ts | 56 ++++++++++++++++++++ yajsapi/rest/market.ts | 14 ++--- yajsapi/rest/payment.ts | 109 ++++++++++++++++++++------------------- 5 files changed, 121 insertions(+), 62 deletions(-) create mode 100644 yajsapi/rest/common.ts diff --git a/yajsapi/executor/ctx.ts b/yajsapi/executor/ctx.ts index 26307f2f4..fbd16f3fe 100755 --- a/yajsapi/executor/ctx.ts +++ b/yajsapi/executor/ctx.ts @@ -365,7 +365,7 @@ export class WorkContext { * * @returns Work object (the latter contains sequence commands added before calling this method) */ - commit({ timeout }: { timeout?: number }): Work { + commit({ timeout }: { timeout?: number } = {}): Work { let steps = this._pending_steps; this._pending_steps = []; return new _Steps(steps, timeout); diff --git a/yajsapi/rest/activity.ts b/yajsapi/rest/activity.ts index 7020e8c73..b25576239 100755 --- a/yajsapi/rest/activity.ts +++ b/yajsapi/rest/activity.ts @@ -240,7 +240,7 @@ class Activity { try { await this._api.destroyActivity(this._id, { timeout: 11000, params: { timeout: 10 } }); } catch (error) { - logger.error(`Got API Exception when destroying activity ${this._id}: ${error}`); + logger.warn(`Got API Exception when destroying activity ${this._id}: ${error}`); } } } diff --git a/yajsapi/rest/common.ts b/yajsapi/rest/common.ts new file mode 100644 index 000000000..08d955df8 --- /dev/null +++ b/yajsapi/rest/common.ts @@ -0,0 +1,56 @@ +import { Callable, logger, sleep } from "../utils"; + +export function is_intermittent_error(e) { + if (e.response && (e.response.status === 408 || e.response.status === 504)) { return true; } + if (e.code === "ECONNABORTED" && e.message && e.message.includes("timeout")) { return true; } + if (e.code === "ETIMEDOUT") { return true; } + if (e.code === "EPIPE") { return true; } + return false; +} + +export async function suppress_exceptions( + condition: Callable, + block: Callable, + report_exceptions: boolean = true +): Promise { + try { + return await block(); + } catch (error) { + if (condition(error)) { + logger.debug(`Exception suppressed: ${error}`); + } else { + throw error; + } + } +} + +export async function repeat_on_error( + block: Callable, + max_tries: number = 5, + max_duration_ms = 15000, + interval: number = 0.0, + condition: Callable = is_intermittent_error +) { + let start_time = Date.now(); + for (let try_num = 1; try_num <= max_tries; ++try_num) { + if (try_num > 1) { + await sleep(Math.min(interval, start_time + max_duration_ms - Date.now())); + } + let err_in_block, ret_value; + await suppress_exceptions(condition, async () => { + try { + ret_value = await block(); + } catch (error) { + err_in_block = error; + throw error; + } + }); + if (err_in_block === undefined) { return ret_value; } + const duration = Date.now() - start_time; + const repeat = try_num < max_tries && duration < max_duration_ms; + const msg = `API call timed out (attempt ${try_num}/${max_tries}), ` + + (repeat ? `retrying in ${interval}s` : "giving up after ${duration}ms"); + logger.debug(msg); + if (!repeat) { throw err_in_block; } + } +} diff --git a/yajsapi/rest/market.ts b/yajsapi/rest/market.ts index 2388ae991..8c4d70534 100755 --- a/yajsapi/rest/market.ts +++ b/yajsapi/rest/market.ts @@ -5,6 +5,7 @@ import { logger, sleep } from "../utils"; import { RequestorApi } from "ya-ts-client/dist/ya-market/api"; import * as models from "ya-ts-client/dist/ya-market/src/models"; import { Configuration } from "ya-ts-client/dist/ya-activity"; +import { suppress_exceptions, is_intermittent_error } from "./common"; dayjs.extend(utc); @@ -237,20 +238,19 @@ export class Subscription { async *events(cancellationToken?): AsyncGenerator { while (this._open) { if (cancellationToken && cancellationToken.cancelled) break; + let proposals: any[] = []; try { - let { data: proposals } = await this._api.collectOffers( - this._id, - 3, - 10, - { timeout: 5000 } - ); + await suppress_exceptions(is_intermittent_error, async () => { + let { data } = await this._api.collectOffers(this._id, 3, 10, { timeout: 5000 }); + proposals = data; + }); for (let _proposal of proposals) { if (cancellationToken && cancellationToken.cancelled) return; if (_proposal.eventType === "ProposalEvent") { yield new OfferProposal(this, _proposal as models.ProposalEvent); } } - if (!proposals || !proposals.length) { + if (!proposals.length) { await sleep(2); } } catch (error) { diff --git a/yajsapi/rest/payment.ts b/yajsapi/rest/payment.ts index 34d3fb313..6227cef4d 100755 --- a/yajsapi/rest/payment.ts +++ b/yajsapi/rest/payment.ts @@ -5,6 +5,7 @@ import * as yap from "ya-ts-client/dist/ya-payment/src/models"; import { Configuration } from "ya-ts-client/dist/ya-activity"; import { RequestorApi } from "ya-ts-client/dist/ya-payment/api"; import { logger, sleep } from "../utils"; +import { is_intermittent_error, repeat_on_error, suppress_exceptions } from "./common"; dayjs.extend(utc); @@ -58,7 +59,9 @@ export class Invoice extends yInvoice { }; acceptance!.totalAmountAccepted = amount.toString(); acceptance!.allocationId = allocation.id; - await this._api.acceptInvoice(this.invoiceId, acceptance!, undefined, { timeout: 15000 }); + await repeat_on_error(async () => { + await this._api.acceptInvoice(this.invoiceId, acceptance!, undefined, { timeout: 7000 }); + }); } } @@ -81,7 +84,9 @@ export class DebitNote extends yDebitNote { }; acceptance!.totalAmountAccepted = amount.toString(); acceptance!.allocationId = allocation.id; - await this._api.acceptDebitNote(this.debitNoteId, acceptance!, undefined, { timeout: 15000 }); + await repeat_on_error(async () => { + await this._api.acceptDebitNote(this.debitNoteId, acceptance!, undefined, { timeout: 7000 }); + }); } } @@ -113,27 +118,24 @@ export class Allocation extends _Link { //"Allocation expiration timestamp" async details(): Promise { - let allocationDetails = new AllocationDetails(); - - try { + return await repeat_on_error(async () => { + let allocationDetails = new AllocationDetails(); let { data: details, - }: { data: yap.Allocation } = await this._api.getAllocation(this.id, { timeout: 15000 }); + }: { data: yap.Allocation } = await this._api.getAllocation(this.id, { timeout: 7000 }); allocationDetails.spent_amount = parseFloat(details.spentAmount); allocationDetails.remaining_amount = parseFloat(details.remainingAmount); - } catch (error) { - logger.error(error); - throw new Error(error); - } - - return allocationDetails; + return allocationDetails; + }); } async delete() { try { - await this._api.releaseAllocation(this.id, { timeout: 15000 }); + await repeat_on_error(async () => { + await this._api.releaseAllocation(this.id, { timeout: 7000 }); + }); } catch(error) { - logger.error(`Release allocation: ${error}`); + logger.error(`Release allocation error: ${error}`); } } } @@ -291,7 +293,9 @@ export class Payment { } async debit_note(debit_note_id: string): Promise { - let { data: debit_note_obj } = await this._api.getDebitNote(debit_note_id, { timeout: 5000 }); + let debit_note_obj = await repeat_on_error(async () => { + return (await this._api.getDebitNote(debit_note_id, { timeout: 5000 })).data; + }); // TODO may need to check only requestor debit notes return new DebitNote(this._api, debit_note_obj) } @@ -307,9 +311,10 @@ export class Payment { } async invoice(invoice_id: string): Promise { - let { data: invoice_obj } = await this._api.getInvoice(invoice_id, { timeout: 5000 }); + let invoice_obj = await repeat_on_error(async () => { + return (await this._api.getInvoice(invoice_id, { timeout: 5000 })).data; + }); // TODO may need to check only requestor invoices - // logger.log("debug", `got=${JSON.stringify(invoice_obj)}`); return new Invoice(this._api, invoice_obj); } @@ -322,28 +327,27 @@ export class Payment { let ts = init_ts; while (true) { if (cancellationToken.cancelled) break; - try { - let { data: events } = await api.getInvoiceEvents( + let events: any[] = []; + await suppress_exceptions(is_intermittent_error, async () => { + let { data } = await api.getInvoiceEvents( 5, ts.format("YYYY-MM-DDTHH:mm:ss.SSSSSSZ") ); - for (let ev of events) { - logger.debug( - `Received invoice event: ${JSON.stringify( - ev - )}, type: ${JSON.stringify(Object.getPrototypeOf(ev))}` - ); - ts = dayjs(ev.eventDate); - if (ev.eventType === "InvoiceReceivedEvent") { - let invoice = await self.invoice(ev["invoiceId"]); - yield invoice; - } - } - if (!events || !events.length) { - await sleep(1); + events = data; + }); + for (let ev of events) { + logger.debug( + `Received invoice event: ${JSON.stringify(ev)}, ` + + `type: ${JSON.stringify(Object.getPrototypeOf(ev))}` + ); + ts = dayjs(ev.eventDate); + if (ev.eventType === "InvoiceReceivedEvent") { + let invoice = await self.invoice(ev["invoiceId"]); + yield invoice; } - } catch (error) { - logger.error(`Received invoice error: ${error}`); + } + if (!events.length) { + await sleep(1); } } return; @@ -361,28 +365,27 @@ export class Payment { let ts = init_ts; while (true) { if (cancellationToken.cancelled) break; - try { - let { data: events } = await api.getDebitNoteEvents( + let events: any[] = []; + await suppress_exceptions(is_intermittent_error, async () => { + let { data } = await api.getDebitNoteEvents( 5, ts.format("YYYY-MM-DDTHH:mm:ss.SSSSSSZ") ); - for (let ev of events) { - logger.debug( - `Received debit note event: ${JSON.stringify( - ev - )}, type: ${JSON.stringify(Object.getPrototypeOf(ev))}` - ); - ts = dayjs(ev.eventDate); - if (ev.eventType === "DebitNoteReceivedEvent") { - let debit_note = await self.debit_note(ev["debitNoteId"]); - yield debit_note; - } - } - if (!events || !events.length) { - await sleep(1); + events = data; + }); + for (let ev of events) { + logger.debug( + `Received debit note event: ${JSON.stringify(ev)}, ` + + `type: ${JSON.stringify(Object.getPrototypeOf(ev))}` + ); + ts = dayjs(ev.eventDate); + if (ev.eventType === "DebitNoteReceivedEvent") { + let debit_note = await self.debit_note(ev["debitNoteId"]); + yield debit_note; } - } catch (error) { - logger.error(`Received debit note error: ${error}`); + } + if (!events.length) { + await sleep(1); } } return;