From a61115bed3e8b7f410a1727dd8938d258b9c58e7 Mon Sep 17 00:00:00 2001 From: Grzegorz Godlewski Date: Wed, 17 Apr 2024 21:12:15 +0200 Subject: [PATCH] feat(yagna): added behaviour subjects for events in yagna-api to support basic event-bus cases * refactor: switched the existing codebase to the subjects * refactor: added graceful shutdown behaviour for yagna.disconnect * chore: applied remark from CR --- package-lock.json | 13 +- package.json | 3 +- src/activity/work/pool-old.ts | 1 + src/agreement/service.ts | 32 ++-- src/experimental/job/job_manager.ts | 1 + src/golem-network.ts | 1 + src/payment/payments.ts | 131 +++++++--------- src/shared/utils/index.ts | 4 +- src/shared/utils/waitForCondition.ts | 40 +++++ src/shared/utils/yagna/yagnaApi.ts | 180 +++++++++++++++++++++- tests/unit/agreement_pool_service.test.ts | 4 - tests/unit/payment_service.test.ts | 46 ++++-- 12 files changed, 319 insertions(+), 137 deletions(-) create mode 100644 src/shared/utils/waitForCondition.ts diff --git a/package-lock.json b/package-lock.json index 295604d18..0a8a48fa2 100644 --- a/package-lock.json +++ b/package-lock.json @@ -24,11 +24,12 @@ "generic-pool": "^3.9.0", "ip-num": "^1.5.1", "js-sha3": "^0.9.3", + "rxjs": "^7.8.1", "semver": "^7.5.4", "tmp": "^0.2.2", "uuid": "^9.0.1", "ws": "^8.16.0", - "ya-ts-client": "^1.0.0" + "ya-ts-client": "^1.1.1-beta.1" }, "devDependencies": { "@commitlint/cli": "^19.0.3", @@ -15357,8 +15358,8 @@ }, "node_modules/rxjs": { "version": "7.8.1", - "dev": true, - "license": "Apache-2.0", + "resolved": "https://registry.npmjs.org/rxjs/-/rxjs-7.8.1.tgz", + "integrity": "sha512-AA3TVj+0A2iuIoQkWEK/tqFjBq2j+6PO6Y0zJcvzLAFhEFIO3HL0vls9hWLncZbAAbK0mar7oZ4V079I/qPMxg==", "dependencies": { "tslib": "^2.1.0" } @@ -16962,7 +16963,6 @@ }, "node_modules/tslib": { "version": "2.6.2", - "dev": true, "license": "0BSD" }, "node_modules/tslint-config-prettier": { @@ -17739,8 +17739,9 @@ } }, "node_modules/ya-ts-client": { - "version": "1.0.0", - "license": "LGPL-3.0", + "version": "1.1.1-beta.1", + "resolved": "https://registry.npmjs.org/ya-ts-client/-/ya-ts-client-1.1.1-beta.1.tgz", + "integrity": "sha512-rvkgvNphGdnm63hLe4hKnw8jvdavxEy1y7beLoAEn1nVhYRSF8unKveUdf0K2FbPhPNbiBT7vMEJVDF8x3fqFQ==", "engines": { "node": ">=18.0.0" } diff --git a/package.json b/package.json index 0e8cc4a1c..eb28478e4 100644 --- a/package.json +++ b/package.json @@ -72,11 +72,12 @@ "generic-pool": "^3.9.0", "ip-num": "^1.5.1", "js-sha3": "^0.9.3", + "rxjs": "^7.8.1", "semver": "^7.5.4", "tmp": "^0.2.2", "uuid": "^9.0.1", "ws": "^8.16.0", - "ya-ts-client": "^1.0.0" + "ya-ts-client": "^1.1.1-beta.1" }, "devDependencies": { "@commitlint/cli": "^19.0.3", diff --git a/src/activity/work/pool-old.ts b/src/activity/work/pool-old.ts index 05b8b60a4..75ca02cf9 100644 --- a/src/activity/work/pool-old.ts +++ b/src/activity/work/pool-old.ts @@ -142,6 +142,7 @@ export class ActivityPool { this.paymentService.end(), ]); await this.pool.clear(); + await this.yagnaApi.disconnect(); this.events.emit("end"); } diff --git a/src/agreement/service.ts b/src/agreement/service.ts index b8e878814..cc773b309 100644 --- a/src/agreement/service.ts +++ b/src/agreement/service.ts @@ -1,9 +1,8 @@ import Bottleneck from "bottleneck"; -import { Logger, YagnaApi, defaultLogger, sleep } from "../shared/utils"; +import { defaultLogger, Logger, sleep, YagnaApi } from "../shared/utils"; import { Agreement, AgreementOptions } from "./agreement"; import { AgreementServiceConfig } from "./config"; import { GolemMarketError, MarketErrorCode, Proposal } from "../market"; -import { MarketApi } from "ya-ts-client"; export interface AgreementDTO { id: string; @@ -231,30 +230,17 @@ export class AgreementPoolService { } private async subscribeForAgreementEvents() { - let afterTimestamp: string | undefined; - while (this.isServiceRunning) { - try { - const events = (await this.yagnaApi.market.collectAgreementEvents( - this.config.agreementEventsFetchingIntervalSec, - afterTimestamp, - this.config.agreementMaxEvents, - this.yagnaApi.appSessionId, - )) as Array; - events.forEach((event) => { - afterTimestamp = event.eventDate; - // @ts-expect-error: Bug in ya-tsclient: typo in eventtype #FIXME - report to core - if (event.eventtype === "AgreementTerminatedEvent") { - this.handleTerminationAgreementEvent(event.agreementId, event.reason); - } - }); - } catch (error) { - this.logger.debug(`Unable to get agreement events.`, error); - await sleep(2); + this.yagnaApi.agreementEvents$.subscribe((event) => { + this.logger.debug("Received agreement operation event", { event }); + if (event) { + if (event.eventType === "AgreementTerminatedEvent" && "reason" in event) { + this.handleTerminationAgreementEvent(event.agreementId, event.reason); + } } - } + }); } - private async handleTerminationAgreementEvent(agreementId: string, reason?: { [key: string]: string }) { + private async handleTerminationAgreementEvent(agreementId: string, reason?: Record) { const agreement = this.agreements.get(agreementId); if (agreement) { await agreement.terminate(reason); diff --git a/src/experimental/job/job_manager.ts b/src/experimental/job/job_manager.ts index e8eb41542..484a61b96 100644 --- a/src/experimental/job/job_manager.ts +++ b/src/experimental/job/job_manager.ts @@ -58,6 +58,7 @@ export class JobManager { public async close() { const pendingJobs = Array.from(this.jobs.values()).filter((job) => job.isRunning()); await Promise.allSettled(pendingJobs.map((job) => job.cancel())); + await this.yagna?.disconnect(); this.yagna = null; } diff --git a/src/golem-network.ts b/src/golem-network.ts index c757b4eab..f69c12878 100644 --- a/src/golem-network.ts +++ b/src/golem-network.ts @@ -84,6 +84,7 @@ export class GolemNetwork { * @return Resolves when all shutdown steps are completed */ async disconnect() { + await this.yagna.disconnect(); this.events.emit("disconnected"); } diff --git a/src/payment/payments.ts b/src/payment/payments.ts index 53d557b93..fb673db9a 100644 --- a/src/payment/payments.ts +++ b/src/payment/payments.ts @@ -1,9 +1,10 @@ import { BasePaymentOptions, PaymentConfig } from "./config"; -import { Logger, sleep, YagnaApi } from "../shared/utils"; +import { Logger, YagnaApi } from "../shared/utils"; import { Invoice } from "./invoice"; import { DebitNote } from "./debit_note"; import { GolemTimeoutError } from "../shared/error/golem-error"; import { EventEmitter } from "eventemitter3"; +import { Subscription } from "rxjs"; export interface PaymentEvents { invoiceReceived: (invoice: Invoice) => void; @@ -19,13 +20,14 @@ export interface PaymentOptions extends BasePaymentOptions { } export class Payments { - private isRunning = true; private options: PaymentConfig; private logger: Logger; - private lastInvoiceFetchingTime: string = new Date().toISOString(); - private lastDebitNotesFetchingTime: string = new Date().toISOString(); + public readonly events = new EventEmitter(); + private debitNoteSubscription: Subscription | null = null; + private invoiceSubscription: Subscription | null = null; + static async create(yagnaApi: YagnaApi, options?: PaymentOptions) { return new Payments(yagnaApi, new PaymentConfig(options)); } @@ -54,91 +56,66 @@ export class Payments { ), this.options.unsubscribeTimeoutMs, ); - this.events.on("unsubscribed", () => { - this.logger.debug(`Payments unsubscribed`); - clearTimeout(timeoutId); - resolve(true); - }); - this.isRunning = false; + + this.debitNoteSubscription?.unsubscribe(); + this.invoiceSubscription?.unsubscribe(); + + clearTimeout(timeoutId); + + resolve(true); }); } private async subscribe() { - this.subscribeForInvoices().catch((error) => this.logger.error(`Unable to collect invoices.`, { error })); - this.subscribeForDebitNotes().catch((error) => this.logger.error(`Unable to collect debit notes.`, { error })); + this.subscribeForInvoices(); + this.subscribeForDebitNotes(); } - private async subscribeForInvoices() { - while (this.isRunning) { - try { - const invoiceEvents = await this.yagnaApi.payment.getInvoiceEvents( - this.options.invoiceFetchingInterval / 1000, - this.lastInvoiceFetchingTime, - this.options.maxInvoiceEvents, - this.yagnaApi.appSessionId, - ); - for (const event of invoiceEvents) { - if (!this.isRunning) break; - if (event.eventType !== "InvoiceReceivedEvent") continue; - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - // @ts-ignore FIXME: ya-ts-client does not provide invoiceId in the event even though it is in the API response - const invoiceId = event["invoiceId"]; - const invoice = await Invoice.create(invoiceId, this.yagnaApi, { ...this.options }).catch((error) => - this.logger.error(`Unable to create invoice`, { id: invoiceId, error }), - ); - if (!invoice) continue; - this.events.emit("invoiceReceived", invoice); - this.lastInvoiceFetchingTime = event.eventDate; - this.logger.debug(`New Invoice received`, { - id: invoice.id, - agreementId: invoice.agreementId, - amount: invoice.amount, - }); + private subscribeForInvoices() { + this.invoiceSubscription = this.yagnaApi.invoiceEvents$.subscribe(async (event) => { + this.logger.debug("Received invoice event from Yagna", { event }); + + if (event && event.eventType === "InvoiceReceivedEvent") { + if (event.invoiceId) { + try { + const invoice = await Invoice.create(event.invoiceId, this.yagnaApi, { ...this.options }); + this.events.emit("invoiceReceived", invoice); + this.logger.debug(`New Invoice received`, { + id: invoice.id, + agreementId: invoice.agreementId, + amount: invoice.amount, + }); + } catch (err) { + this.logger.error(`Unable to create invoice`, { event, err }); + } + } else { + this.logger.warn("Received invoice event without invoice ID", { event }); } - } catch (error) { - const reason = error.response?.data?.message || error.message || error; - this.logger.error(`Unable to get invoices.`, { reason }); - await sleep(2); } - } - this.events.emit("unsubscribed"); + }); } - private async subscribeForDebitNotes() { - while (this.isRunning) { - try { - const debitNotesEvents = await this.yagnaApi.payment - .getDebitNoteEvents( - this.options.debitNotesFetchingInterval / 1000, - this.lastDebitNotesFetchingTime, - this.options.maxDebitNotesEvents, - this.yagnaApi.appSessionId, - ) - .catch(() => []); - - for (const event of debitNotesEvents) { - if (!this.isRunning) return; - if (event.eventType !== "DebitNoteReceivedEvent") continue; - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - // @ts-ignore FIXME: ya-ts-client does not provide debitNoteId in the event even though it is in the API response - const debitNoteId = event["debitNoteId"]; - const debitNote = await DebitNote.create(debitNoteId, this.yagnaApi, { ...this.options }).catch((error) => - this.logger.error(`Unable to create debit note`, { id: debitNoteId, error }), - ); - if (!debitNote) continue; - this.events.emit("debitNoteReceived", debitNote); - this.lastDebitNotesFetchingTime = event.eventDate; - this.logger.debug("New Debit Note received", { - agreementId: debitNote.agreementId, - amount: debitNote.totalAmountDue, - }); + private subscribeForDebitNotes() { + this.debitNoteSubscription = this.yagnaApi.debitNoteEvents$.subscribe(async (event) => { + this.logger.debug("Received debit note event from Yagna", { event }); + + if (event && event.eventType === "DebitNoteReceivedEvent") { + if (event.debitNoteId) { + try { + const debitNote = await DebitNote.create(event.debitNoteId, this.yagnaApi, { ...this.options }); + this.events.emit("debitNoteReceived", debitNote); + this.logger.debug("New Debit Note received", { + agreementId: debitNote.agreementId, + amount: debitNote.totalAmountDue, + }); + } catch (err) { + this.logger.error(`Unable to create debit note`, { event, err }); + } + } else { + this.logger.warn("Received debit note event without debit note ID", { event }); } - } catch (error) { - const reason = error.response?.data?.message || error; - this.logger.error(`Unable to get debit notes.`, { reason }); - await sleep(2); } - } + }); } } diff --git a/src/shared/utils/index.ts b/src/shared/utils/index.ts index 7361f91a7..23e8c0203 100644 --- a/src/shared/utils/index.ts +++ b/src/shared/utils/index.ts @@ -1,9 +1,9 @@ import sleep from "./sleep"; + export { sleep }; export * as runtimeContextChecker from "./runtimeContextChecker"; export { Logger } from "./logger/logger"; export { nullLogger } from "./logger/nullLogger"; export { defaultLogger } from "./logger/defaultLogger"; export * as EnvUtils from "./env"; -export { YagnaApi, YagnaOptions } from "./yagna/yagnaApi"; -export { YagnaEventSubscription } from "./yagna/yagnaApi"; +export { YagnaApi, YagnaOptions, YagnaEventSubscription } from "./yagna/yagnaApi"; diff --git a/src/shared/utils/waitForCondition.ts b/src/shared/utils/waitForCondition.ts new file mode 100644 index 000000000..c43bdbe22 --- /dev/null +++ b/src/shared/utils/waitForCondition.ts @@ -0,0 +1,40 @@ +import { clearInterval } from "node:timers"; +import { GolemTimeoutError } from "../error/golem-error"; + +/** + * Utility function that helps to block the execution until a condition is met (check returns true) or the timeout happens. + * + * @param {function} check - The function checking if the condition is met. + * @param {Object} [opts] - Options controlling the timeout and check interval in seconds. + * @param {number} [opts.timeoutSeconds=15] - The timeout value in seconds. Default is 15 seconds. + * @param {number} [opts.intervalSeconds=1] - The interval between condition checks in seconds. Default is 1 second. + * + * @return {Promise} - Resolves when the condition is met or rejects with a timeout error if it wasn't met on time. + */ +export function waitForCondition( + check: () => boolean, + opts = { timeoutSeconds: 15, intervalSeconds: 1 }, +): Promise { + let verifyInterval: NodeJS.Timeout | undefined; + let waitTimeout: NodeJS.Timeout | undefined; + + const verify = new Promise((resolve) => { + verifyInterval = setInterval(() => { + if (check()) { + clearInterval(verifyInterval); + resolve(); + } + }, opts.intervalSeconds * 1000); + }); + + const wait = new Promise((_, reject) => { + waitTimeout = setTimeout(() => { + reject(new GolemTimeoutError(`Condition was not met within ${opts.timeoutSeconds}s`)); + }, opts.timeoutSeconds * 1000); + }); + + return Promise.race([verify, wait]).finally(() => { + clearInterval(verifyInterval); + clearTimeout(waitTimeout); + }); +} diff --git a/src/shared/utils/yagna/yagnaApi.ts b/src/shared/utils/yagna/yagnaApi.ts index ab6336417..925fff2c8 100644 --- a/src/shared/utils/yagna/yagnaApi.ts +++ b/src/shared/utils/yagna/yagnaApi.ts @@ -3,11 +3,12 @@ import * as EnvUtils from "../env"; import { GolemConfigError, GolemPlatformError } from "../../error/golem-error"; import { v4 } from "uuid"; import { Logger } from "../logger/logger"; -import { defaultLogger } from "../logger/defaultLogger"; - -// .js added for ESM compatibility +import { defaultLogger } from "../logger/defaultLogger"; // .js added for ESM compatibility import semverSatisfies from "semver/functions/satisfies.js"; import semverCoerce from "semver/functions/coerce.js"; +import { BehaviorSubject } from "rxjs"; +import { EventDTO } from "ya-ts-client/dist/market-api"; +import { waitForCondition } from "../waitForCondition"; export type YagnaOptions = { apiKey?: string; @@ -17,8 +18,46 @@ export type YagnaOptions = { export const MIN_SUPPORTED_YAGNA = "0.13.2"; +type CancellablePoll = { + /** User defined name of the event stream for ease of debugging */ + eventType: string; + + /** Tells if a poll call is currently active - reader */ + isBusy: boolean; + + /** Tells if the poll is active in general. If it's 'false' it means that the poll was cancelled and no polling attempts will be done any more */ + isOnline: boolean; + + /** Triggers the poll using the fetcher provided when the CancellablePoll was created */ + pollValues: () => AsyncGenerator; + + /** + * Cancels the polling operations, stopping the reader + * + * It will wait for the last read to complete and will take the reader offline + */ + cancel: () => Promise; +}; + +/** + * Utility type extracting the type of the element of a typed array + */ +type ElementOf = T extends Array ? U : never; + +// Workarounds for an issue with missing support for discriminators +// {@link https://github.com/ferdikoomen/openapi-typescript-codegen/issues/985} +export type YagnaAgreementOperationEvent = ElementOf< + Awaited> +>; +export type YagnaInvoiceEvent = ElementOf< + Awaited> +>; +export type YagnaDebitNoteEvent = ElementOf< + Awaited> +>; + /** - * Utility class that groups various Yagna APIs under a single wrapper + * Utility class that groups various Yagna APIs under a single wrapper, also an event consumer which produces events on rxjs BehaviourSubects */ export class YagnaApi { public readonly appSessionId: string; @@ -51,6 +90,15 @@ export class YagnaApi { public version: YaTsClient.VersionApi.DefaultService; + private debitNoteEventsPoll: CancellablePoll | null = null; + public debitNoteEvents$ = new BehaviorSubject(null); + + private invoiceEventPoll: CancellablePoll | null = null; + public invoiceEvents$ = new BehaviorSubject(null); + + private agreementEventsPoll: CancellablePoll | null = null; + public agreementEvents$ = new BehaviorSubject(null); + constructor(options?: YagnaOptions) { const apiKey = options?.apiKey || EnvUtils.getYagnaAppKey(); this.basePath = options?.basePath || EnvUtils.getYagnaApiUrl(); @@ -82,14 +130,14 @@ export class YagnaApi { this.payment = paymentClient.requestor; - const z = new YaTsClient.ActivityApi.Client({ + const activityApiClient = new YaTsClient.ActivityApi.Client({ BASE: `${this.basePath}/activity-api/v1`, HEADERS: commonHeaders, }); this.activity = { - control: z.requestorControl, - state: z.requestorState, + control: activityApiClient.requestorControl, + state: activityApiClient.requestorState, }; const netClient = new YaTsClient.NetApi.Client({ @@ -124,11 +172,127 @@ export class YagnaApi { this.appSessionId = v4(); } + /** + * Effectively starts the Yagna API client including subscribing to events exposed via rxjs subjects + */ async connect() { this.logger.info("Connecting to yagna"); + await this.assertSupportedVersion(); - return this.identity.getIdentity(); + const identity = this.identity.getIdentity(); + + this.startPollingEvents(); + + return identity; + } + + /** + * Terminates the Yagna API related activities + */ + async disconnect() { + await this.stopPollingEvents(); + } + + private startPollingEvents() { + this.logger.info("Starting to poll for events from Yagna"); + + const pollIntervalSec = 5; + const maxEvents = 100; + + this.agreementEventsPoll = this.createEventPoller("AgreementEvents", (lastEventTimestamp) => + this.market.collectAgreementEvents(pollIntervalSec, lastEventTimestamp, maxEvents, this.appSessionId), + ); + + this.debitNoteEventsPoll = this.createEventPoller("DebitNoteEvents", (lastEventTimestamp) => { + return this.payment.getDebitNoteEvents(pollIntervalSec, lastEventTimestamp, maxEvents, this.appSessionId); + }); + + this.invoiceEventPoll = this.createEventPoller("InvoiceEvents", (lastEventTimestamp) => + this.payment.getInvoiceEvents(pollIntervalSec, lastEventTimestamp, maxEvents, this.appSessionId), + ); + + // Run the readers and don't block execution + this.pollToSubject(this.agreementEventsPoll.pollValues(), this.agreementEvents$) + .then(() => this.logger.info("Finished polling agreement events from Yagna")) + .catch((err) => this.logger.error("Error while polling agreement events from Yagna", err)); + + this.pollToSubject(this.debitNoteEventsPoll.pollValues(), this.debitNoteEvents$) + .then(() => this.logger.info("Finished polling debit note events from Yagna")) + .catch((err) => this.logger.error("Error while polling debit note events from Yagna", err)); + + this.pollToSubject(this.invoiceEventPoll.pollValues(), this.invoiceEvents$) + .then(() => this.logger.info("Finished polling invoice events from Yagna")) + .catch((err) => this.logger.error("Error while polling invoice events from Yagna", err)); + } + + private async stopPollingEvents() { + this.logger.info("Stopping polling events from Yagna"); + + if (this.invoiceEventPoll) { + await this.invoiceEventPoll.cancel(); + } + + if (this.debitNoteEventsPoll) { + await this.debitNoteEventsPoll.cancel(); + } + + if (this.agreementEventsPoll) { + await this.agreementEventsPoll.cancel(); + } + + this.logger.info("Stopped polling events form Yagna"); + } + + private async pollToSubject(generator: AsyncGenerator, subject: BehaviorSubject) { + for await (const value of generator) { + subject.next(value); + } + } + + private createEventPoller( + eventType: string, + eventsFetcher: (lastEventTimestamp: string) => Promise, + ): CancellablePoll { + let isBusy = false; + let keepReading = true; + let lastTimestamp = new Date().toISOString(); + + const logger = this.logger; + + return { + eventType, + isBusy, + isOnline: true, + pollValues: async function* () { + while (keepReading) { + try { + isBusy = true; + const events = await eventsFetcher(lastTimestamp); + logger.debug("Polled events from Yagna", { + eventType, + count: events.length, + lastEventTimestamp: lastTimestamp, + }); + for (const event of events) { + yield event; + lastTimestamp = event.eventDate; + } + } catch (error) { + logger.error("Error fetching events from Yagna", { eventType, error }); + } finally { + isBusy = false; + } + } + logger.debug("Stopped reading events", { eventType }); + this.isOnline = false; + }, + cancel: async function () { + keepReading = false; + await waitForCondition(() => !this.isBusy && !this.isOnline); + logger.debug("Cancelled reading the events", { eventType }); + }, + }; } private async assertSupportedVersion() { diff --git a/tests/unit/agreement_pool_service.test.ts b/tests/unit/agreement_pool_service.test.ts index 231b49517..c02369f69 100644 --- a/tests/unit/agreement_pool_service.test.ts +++ b/tests/unit/agreement_pool_service.test.ts @@ -76,10 +76,6 @@ describe("Agreement Pool Service", () => { when(mockYagna.market).thenReturn(marketApi); - when(mockMarket.collectAgreementEvents(anything(), anything(), anything(), anything())).thenCall(() => - simulateLongPoll([]), - ); - when(mockMarket.createAgreement(anything())).thenResolve("agreement-id"); when(mockMarket.getAgreement("agreement-id")) diff --git a/tests/unit/payment_service.test.ts b/tests/unit/payment_service.test.ts index c0cc7042a..143537de5 100644 --- a/tests/unit/payment_service.test.ts +++ b/tests/unit/payment_service.test.ts @@ -2,10 +2,11 @@ import { Allocation, GolemPaymentError, PaymentErrorCode, PaymentFilters, Paymen import { debitNotes, debitNotesEvents, invoiceEvents, invoices } from "../fixtures"; import { anything, instance, mock, reset, when } from "@johanblumenberg/ts-mockito"; import { LoggerMock } from "../mock/utils/logger"; -import { Agreement, GolemUserError, YagnaApi } from "../../src"; +import { Agreement, GolemUserError, YagnaApi, YagnaDebitNoteEvent, YagnaInvoiceEvent } from "../../src"; import * as YaTsClient from "ya-ts-client"; import { simulateLongPoll } from "./helpers"; +import { BehaviorSubject } from "rxjs"; const logger = new LoggerMock(); @@ -130,12 +131,18 @@ describe("Payment Service", () => { invoiceFilter: PaymentFilters.acceptMaxAmountInvoiceFilter(7), }); - when(mockPayment.getInvoiceEvents(anything(), anything(), anything(), anything())).thenCall(() => - simulateLongPoll(invoiceEvents), - ); + const invoiceSubject$ = new BehaviorSubject(null); + const debitNoteSubject$ = new BehaviorSubject(null); + + when(mockYagna.invoiceEvents$).thenReturn(invoiceSubject$); + when(mockYagna.debitNoteEvents$).thenReturn(debitNoteSubject$); + when(mockPayment.getInvoice(anything())).thenResolve(invoices[0]); await paymentService.run(); + + invoiceEvents.forEach((e) => invoiceSubject$.next(e)); + expect(() => paymentService.acceptPayments(agreement)).toThrow( new GolemPaymentError( "You need to create an allocation before starting any payment processes", @@ -162,12 +169,11 @@ describe("Payment Service", () => { paymentTimeout: TEST_PAYMENT_TIMEOUT_MS, }); - when(mockPayment.getInvoiceEvents(anything(), anything(), anything(), anything())).thenCall(() => - simulateLongPoll([]), - ); - when(mockPayment.getDebitNoteEvents(anything(), anything(), anything(), anything())).thenCall(() => - simulateLongPoll(debitNotesEvents), - ); + const invoiceSubject$ = new BehaviorSubject(null); + const debitNoteSubject$ = new BehaviorSubject(null); + + when(mockYagna.invoiceEvents$).thenReturn(invoiceSubject$); + when(mockYagna.debitNoteEvents$).thenReturn(debitNoteSubject$); when(mockPayment.getDebitNote(anything())).thenResolve(debitNotes[0]); const handler = jest.fn(); @@ -177,6 +183,10 @@ describe("Payment Service", () => { await paymentService.createAllocation(); paymentService.acceptPayments(agreement); await paymentService.run(); + + invoiceSubject$.next(null); + debitNotesEvents.forEach((e) => debitNoteSubject$.next(e)); + await paymentService.end(); // Then @@ -195,12 +205,12 @@ describe("Payment Service", () => { paymentTimeout: TEST_PAYMENT_TIMEOUT_MS, }); - when(mockPayment.getDebitNoteEvents(anything(), anything(), anything(), anything())).thenCall(() => - simulateLongPoll([]), - ); - when(mockPayment.getInvoiceEvents(anything(), anything(), anything(), anything())).thenCall(() => - simulateLongPoll(invoiceEvents), - ); + const invoiceSubject$ = new BehaviorSubject(null); + const debitNoteSubject$ = new BehaviorSubject(null); + + when(mockYagna.invoiceEvents$).thenReturn(invoiceSubject$); + when(mockYagna.debitNoteEvents$).thenReturn(debitNoteSubject$); + when(mockPayment.getInvoice(anything())).thenResolve(invoices[0]); const handler = jest.fn(); @@ -210,6 +220,10 @@ describe("Payment Service", () => { await paymentService.createAllocation(); paymentService.acceptPayments(agreement); await paymentService.run(); + + debitNoteSubject$.next(null); + invoiceEvents.forEach((e) => invoiceSubject$.next(e)); + await paymentService.end(); // Then