From ecffa7fb326534eedfc4177b34d10ba88a6f4178 Mon Sep 17 00:00:00 2001 From: Seweryn Kras Date: Thu, 20 Jun 2024 14:13:04 +0200 Subject: [PATCH 1/2] perf: speed up yagna shutdown --- src/golem-network/golem-network.ts | 1 - src/payment/api.ts | 3 -- src/shared/utils/wait.ts | 10 +++-- .../yagna/adapters/payment-api-adapter.ts | 5 --- src/shared/yagna/event-reader.ts | 40 ++++++++++--------- src/shared/yagna/yagnaApi.ts | 12 +++--- 6 files changed, 34 insertions(+), 37 deletions(-) diff --git a/src/golem-network/golem-network.ts b/src/golem-network/golem-network.ts index 317d5a45d..7e1cd1331 100644 --- a/src/golem-network/golem-network.ts +++ b/src/golem-network/golem-network.ts @@ -322,7 +322,6 @@ export class GolemNetwork { async disconnect() { await Promise.allSettled(this.cleanupTasks.map((task) => task())); await this.storageProvider.close(); - await this.services.paymentApi.disconnect(); await this.yagna.disconnect(); this.services.proposalCache.flushAll(); diff --git a/src/payment/api.ts b/src/payment/api.ts index 19279385b..2d5a0c6cc 100644 --- a/src/payment/api.ts +++ b/src/payment/api.ts @@ -34,9 +34,6 @@ export interface IPaymentApi { /** Starts the reader logic */ connect(): Promise; - /** Terminates the reader logic */ - disconnect(): Promise; - getInvoice(id: string): Promise; acceptInvoice(invoice: Invoice, allocation: Allocation, amount: string): Promise; diff --git a/src/shared/utils/wait.ts b/src/shared/utils/wait.ts index 5782b60be..f35c644d0 100644 --- a/src/shared/utils/wait.ts +++ b/src/shared/utils/wait.ts @@ -12,8 +12,10 @@ import { GolemTimeoutError } from "../error/golem-error"; */ export function waitForCondition( check: () => boolean | Promise, - opts = { timeoutSeconds: 30, intervalSeconds: 1 }, + opts?: { timeoutSeconds?: number; intervalSeconds?: number }, ): Promise { + const timeoutSeconds = opts?.timeoutSeconds ?? 30; + const intervalSeconds = opts?.intervalSeconds ?? 1; let verifyInterval: NodeJS.Timeout | undefined; let waitTimeout: NodeJS.Timeout | undefined; @@ -22,13 +24,13 @@ export function waitForCondition( if (await check()) { resolve(); } - }, opts.intervalSeconds * 1000); + }, intervalSeconds * 1000); }); const wait = new Promise((_, reject) => { waitTimeout = setTimeout(() => { - reject(new GolemTimeoutError(`Condition was not met within ${opts.timeoutSeconds}s`)); - }, opts.timeoutSeconds * 1000); + reject(new GolemTimeoutError(`Condition was not met within ${timeoutSeconds}s`)); + }, timeoutSeconds * 1000); }); return Promise.race([verify, wait]).finally(() => { diff --git a/src/shared/yagna/adapters/payment-api-adapter.ts b/src/shared/yagna/adapters/payment-api-adapter.ts index 33033fd8d..9e6156497 100644 --- a/src/shared/yagna/adapters/payment-api-adapter.ts +++ b/src/shared/yagna/adapters/payment-api-adapter.ts @@ -71,11 +71,6 @@ export class PaymentApiAdapter implements IPaymentApi { return this.debitNoteRepo.getById(id); } - async disconnect() { - this.logger.debug("Disconnecting Payment API Adapter"); - this.logger.debug("Payment API Adapter disconnected"); - } - async acceptInvoice(invoice: Invoice, allocation: Allocation, amount: string): Promise { try { await this.yagna.payment.acceptInvoice(invoice.id, { diff --git a/src/shared/yagna/event-reader.ts b/src/shared/yagna/event-reader.ts index b46f96d2b..9322b9d7b 100644 --- a/src/shared/yagna/event-reader.ts +++ b/src/shared/yagna/event-reader.ts @@ -7,24 +7,22 @@ export type CancellablePoll = { /** User defined name of the event stream for ease of debugging */ eventType: string; - /** Tells if a poll call is currently active - reader */ - isBusy: boolean; - - /** Tells if the poll is active in general. If it's 'false' it means that the poll was cancelled and no polling attempts will be done any more */ - isOnline: boolean; + /** Flag indicating if the reader is finished and no longer polling */ + isFinished: boolean; /** Triggers the poll using the fetcher provided when the CancellablePoll was created */ pollValues: () => AsyncGenerator; /** * Cancels the polling operations, stopping the reader - * - * It will wait for the last read to complete and will take the reader offline */ cancel: () => Promise; }; -export type EventsFetcherWithCursor = (lastEventTimestamp: string) => Promise; +type CancellablePromise = Promise & { cancel: () => void }; +export type CancellableEventsFetcherWithCursor = ( + lastEventTimestamp: string, +) => CancellablePromise; export class EventReader { public constructor(private readonly logger: Logger) {} @@ -39,24 +37,23 @@ export class EventReader { public createReader( eventType: string, - eventsFetcher: EventsFetcherWithCursor, + eventsFetcher: CancellableEventsFetcherWithCursor, ): CancellablePoll { - let isBusy = false; - let isOnline = true; + let isFinished = false; let keepReading = true; + let currentPoll: CancellablePromise | null = null; let lastTimestamp = new Date().toISOString(); const logger = this.logger; return { eventType, - isBusy, - isOnline, + isFinished, pollValues: async function* () { while (keepReading) { try { - isBusy = true; - const events = await eventsFetcher(lastTimestamp); + currentPoll = eventsFetcher(lastTimestamp); + const events = await currentPoll; logger.debug("Polled events from Yagna", { eventType, count: events.length, @@ -67,17 +64,22 @@ export class EventReader { lastTimestamp = event.eventDate; } } catch (error) { + if (typeof error === "object" && error.name === "CancelError") { + logger.debug("Polling was cancelled", { eventType }); + continue; + } logger.error("Error fetching events from Yagna", { eventType, error }); - } finally { - isBusy = false; } } logger.debug("Stopped reading events", { eventType }); - isOnline = false; + isFinished = true; }, cancel: async function () { keepReading = false; - await waitForCondition(() => !isBusy && !isOnline); + if (currentPoll) { + currentPoll.cancel(); + } + await waitForCondition(() => isFinished, { intervalSeconds: 0 }); logger.debug("Cancelled reading the events", { eventType }); }, }; diff --git a/src/shared/yagna/yagnaApi.ts b/src/shared/yagna/yagnaApi.ts index dc1ca4333..44771b04f 100644 --- a/src/shared/yagna/yagnaApi.ts +++ b/src/shared/yagna/yagnaApi.ts @@ -246,19 +246,21 @@ export class YagnaApi { private async stopPollingEvents() { this.logger.debug("Stopping polling events from Yagna"); + const promises: Promise[] = []; if (this.invoiceEventPoll) { - await this.invoiceEventPoll.cancel(); + promises.push(this.invoiceEventPoll.cancel()); } if (this.debitNoteEventsPoll) { - await this.debitNoteEventsPoll.cancel(); + promises.push(this.debitNoteEventsPoll.cancel()); } if (this.agreementEventsPoll) { - await this.agreementEventsPoll.cancel(); + promises.push(this.agreementEventsPoll.cancel()); } + await Promise.allSettled(promises); - this.logger.debug("Stopped polling events form Yagna"); + this.logger.debug("Stopped polling events from Yagna"); } private async assertSupportedVersion() { @@ -272,7 +274,7 @@ export class YagnaApi { if (!normVersion) { throw new GolemPlatformError( - `Unreadable yana version '${version}'. Can't proceed without checking yagna version support status.`, + `Unreadable yagna version '${version}'. Can't proceed without checking yagna version support status.`, ); } From d13da74c187e4ada9a8d14a48e2d2f5c285c9f68 Mon Sep 17 00:00:00 2001 From: Seweryn Kras Date: Thu, 20 Jun 2024 14:18:15 +0200 Subject: [PATCH 2/2] test: fix typo in test --- src/shared/yagna/yagna.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/shared/yagna/yagna.spec.ts b/src/shared/yagna/yagna.spec.ts index c67c10500..1a70deccf 100644 --- a/src/shared/yagna/yagna.spec.ts +++ b/src/shared/yagna/yagna.spec.ts @@ -84,7 +84,7 @@ describe("Yagna Utils", () => { await expect(() => y.connect()).rejects.toMatchError( new GolemPlatformError( - `Unreadable yana version 'broken'. Can't proceed without checking yagna version support status.`, + `Unreadable yagna version 'broken'. Can't proceed without checking yagna version support status.`, ), ); });