From 4eb8a12f0671be988d3ee6326c544439f33d012e Mon Sep 17 00:00:00 2001 From: Seweryn Kras Date: Wed, 29 May 2024 11:00:36 +0200 Subject: [PATCH] feat: remove start method from agreement payment process and start it automatically --- src/lease-process/lease.module.ts | 6 +- src/payment/agreement_payment_process.spec.ts | 3 + src/payment/agreement_payment_process.ts | 59 ++++++++----------- 3 files changed, 29 insertions(+), 39 deletions(-) diff --git a/src/lease-process/lease.module.ts b/src/lease-process/lease.module.ts index acdcc6511..2414ba9bd 100644 --- a/src/lease-process/lease.module.ts +++ b/src/lease-process/lease.module.ts @@ -10,7 +10,7 @@ import { LeaseProcessPool, LeaseProcessPoolOptions } from "./lease-process-pool" export interface LeaseModule { /** * Factory that creates a new lease process that's fully configured. - * This method will also create and start the payment process for the agreement. + * This method will also create the payment process for the agreement. * */ createLease(agreement: Agreement, allocation: Allocation, options?: LeaseProcessOptions): LeaseProcess; @@ -51,10 +51,6 @@ export class LeaseModuleImpl implements LeaseModule { this.deps.logger.child("lease-process"), options, ); - paymentProcess.start(); - lease.events.once("finalized", () => { - paymentProcess.stop(); - }); return lease; } diff --git a/src/payment/agreement_payment_process.spec.ts b/src/payment/agreement_payment_process.spec.ts index e6797279f..9d19cfec1 100644 --- a/src/payment/agreement_payment_process.spec.ts +++ b/src/payment/agreement_payment_process.spec.ts @@ -7,6 +7,7 @@ import { DebitNote } from "./debit_note"; import { GolemPaymentError, PaymentErrorCode } from "./error"; import { GolemUserError } from "../shared/error/golem-error"; import { IPaymentApi } from "./types"; +import { Subject } from "rxjs"; const agreementMock = mock(Agreement); const allocationMock = mock(Allocation); @@ -30,6 +31,8 @@ beforeEach(() => { when(agreementMock.getProviderInfo()).thenReturn(testProviderInfo); when(invoiceMock.provider).thenReturn(testProviderInfo); + when(mockPaymentApi.receivedInvoices$).thenReturn(new Subject()); + when(mockPaymentApi.receivedDebitNotes$).thenReturn(new Subject()); }); describe("AgreementPaymentProcess", () => { diff --git a/src/payment/agreement_payment_process.ts b/src/payment/agreement_payment_process.ts index e2e4494a8..86f7515af 100644 --- a/src/payment/agreement_payment_process.ts +++ b/src/payment/agreement_payment_process.ts @@ -35,7 +35,9 @@ export interface PaymentProcessOptions { } /** - * Process manager that controls the logic behind processing events related to an agreement which result with payments + * Process manager that controls the logic behind processing payments for an agreement (debit notes and invoices). + * The process is started automatically and ends when the final invoice is received. + * You can stop the process earlier by calling the `stop` method. You cannot restart the process after stopping it. */ export class AgreementPaymentProcess { private invoice: Invoice | null = null; @@ -51,7 +53,7 @@ export class AgreementPaymentProcess { public readonly logger: Logger; - private cleanup: (() => void) | null = null; + private readonly cleanupSubscriptions: () => void; constructor( public readonly agreement: Agreement, @@ -65,6 +67,19 @@ export class AgreementPaymentProcess { invoiceFilter: options?.invoiceFilter || (() => true), debitNoteFilter: options?.debitNoteFilter || (() => true), }; + + const invoiceSubscription = this.paymentApi.receivedInvoices$ + .pipe(filter((invoice) => invoice.agreementId === this.agreement.id)) + .subscribe((invoice) => this.addInvoice(invoice)); + + const debitNoteSubscription = this.paymentApi.receivedDebitNotes$ + .pipe(filter((debitNote) => debitNote.agreementId === this.agreement.id)) + .subscribe((debitNote) => this.addDebitNote(debitNote)); + + this.cleanupSubscriptions = () => { + invoiceSubscription.unsubscribe(); + debitNoteSubscription.unsubscribe(); + }; } /** @@ -191,6 +206,11 @@ export class AgreementPaymentProcess { } } + private finalize(invoice: Invoice) { + this.invoice = invoice; + this.cleanupSubscriptions(); + } + private async applyInvoice(invoice: Invoice) { this.logger.debug("Applying invoice for agreement", { invoiceId: invoice.id, @@ -218,7 +238,7 @@ export class AgreementPaymentProcess { ); } - this.invoice = invoice; + this.finalize(invoice); let acceptedByFilter = false; try { @@ -298,39 +318,10 @@ export class AgreementPaymentProcess { } public isStarted() { - return this.cleanup !== null; - } - - /** - * Subscribe to payment events and add each invoice and debit note - * to the payment process. - * To stop the subscription, call `.stop()`. - */ - public start(): void { - if (this.isStarted()) { - throw new GolemUserError("Payment process already started"); - } - const invoiceSubscription = this.paymentApi.receivedInvoices$ - .pipe(filter((invoice) => invoice.agreementId === this.agreement.id)) - .subscribe(async (invoice) => { - await this.addInvoice(invoice); - }); - - const debitNoteSubscription = this.paymentApi.receivedDebitNotes$ - .pipe(filter((debitNote) => debitNote.agreementId === this.agreement.id)) - .subscribe(async (debitNote) => { - await this.addDebitNote(debitNote); - }); - this.cleanup = () => { - invoiceSubscription.unsubscribe(); - debitNoteSubscription.unsubscribe(); - }; + return this.cleanupSubscriptions !== null; } public stop(): void { - if (!this.isStarted()) { - throw new GolemUserError("Payment process not started"); - } - this.cleanup?.(); + this.cleanupSubscriptions(); } }