Skip to content

Commit

Permalink
feat: remove start method from agreement payment process and start it…
Browse files Browse the repository at this point in the history
… automatically
  • Loading branch information
SewerynKras committed May 29, 2024
1 parent e3b384b commit 4eb8a12
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 39 deletions.
6 changes: 1 addition & 5 deletions src/lease-process/lease.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { LeaseProcessPool, LeaseProcessPoolOptions } from "./lease-process-pool"
export interface LeaseModule {
/**
* Factory that creates a new lease process that's fully configured.
* This method will also create and start the payment process for the agreement.
* This method will also create the payment process for the agreement.
*
*/
createLease(agreement: Agreement, allocation: Allocation, options?: LeaseProcessOptions): LeaseProcess;
Expand Down Expand Up @@ -51,10 +51,6 @@ export class LeaseModuleImpl implements LeaseModule {
this.deps.logger.child("lease-process"),
options,
);
paymentProcess.start();
lease.events.once("finalized", () => {
paymentProcess.stop();
});
return lease;
}

Expand Down
3 changes: 3 additions & 0 deletions src/payment/agreement_payment_process.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { DebitNote } from "./debit_note";
import { GolemPaymentError, PaymentErrorCode } from "./error";
import { GolemUserError } from "../shared/error/golem-error";
import { IPaymentApi } from "./types";
import { Subject } from "rxjs";

const agreementMock = mock(Agreement);
const allocationMock = mock(Allocation);
Expand All @@ -30,6 +31,8 @@ beforeEach(() => {

when(agreementMock.getProviderInfo()).thenReturn(testProviderInfo);
when(invoiceMock.provider).thenReturn(testProviderInfo);
when(mockPaymentApi.receivedInvoices$).thenReturn(new Subject());
when(mockPaymentApi.receivedDebitNotes$).thenReturn(new Subject());
});

describe("AgreementPaymentProcess", () => {
Expand Down
59 changes: 25 additions & 34 deletions src/payment/agreement_payment_process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ export interface PaymentProcessOptions {
}

/**
* Process manager that controls the logic behind processing events related to an agreement which result with payments
* Process manager that controls the logic behind processing payments for an agreement (debit notes and invoices).
* The process is started automatically and ends when the final invoice is received.
* You can stop the process earlier by calling the `stop` method. You cannot restart the process after stopping it.
*/
export class AgreementPaymentProcess {
private invoice: Invoice | null = null;
Expand All @@ -51,7 +53,7 @@ export class AgreementPaymentProcess {

public readonly logger: Logger;

private cleanup: (() => void) | null = null;
private readonly cleanupSubscriptions: () => void;

constructor(
public readonly agreement: Agreement,
Expand All @@ -65,6 +67,19 @@ export class AgreementPaymentProcess {
invoiceFilter: options?.invoiceFilter || (() => true),
debitNoteFilter: options?.debitNoteFilter || (() => true),
};

const invoiceSubscription = this.paymentApi.receivedInvoices$
.pipe(filter((invoice) => invoice.agreementId === this.agreement.id))
.subscribe((invoice) => this.addInvoice(invoice));

const debitNoteSubscription = this.paymentApi.receivedDebitNotes$
.pipe(filter((debitNote) => debitNote.agreementId === this.agreement.id))
.subscribe((debitNote) => this.addDebitNote(debitNote));

this.cleanupSubscriptions = () => {
invoiceSubscription.unsubscribe();
debitNoteSubscription.unsubscribe();
};
}

/**
Expand Down Expand Up @@ -191,6 +206,11 @@ export class AgreementPaymentProcess {
}
}

private finalize(invoice: Invoice) {
this.invoice = invoice;
this.cleanupSubscriptions();
}

private async applyInvoice(invoice: Invoice) {
this.logger.debug("Applying invoice for agreement", {
invoiceId: invoice.id,
Expand Down Expand Up @@ -218,7 +238,7 @@ export class AgreementPaymentProcess {
);
}

this.invoice = invoice;
this.finalize(invoice);

let acceptedByFilter = false;
try {
Expand Down Expand Up @@ -298,39 +318,10 @@ export class AgreementPaymentProcess {
}

public isStarted() {
return this.cleanup !== null;
}

/**
* Subscribe to payment events and add each invoice and debit note
* to the payment process.
* To stop the subscription, call `.stop()`.
*/
public start(): void {
if (this.isStarted()) {
throw new GolemUserError("Payment process already started");
}
const invoiceSubscription = this.paymentApi.receivedInvoices$
.pipe(filter((invoice) => invoice.agreementId === this.agreement.id))
.subscribe(async (invoice) => {
await this.addInvoice(invoice);
});

const debitNoteSubscription = this.paymentApi.receivedDebitNotes$
.pipe(filter((debitNote) => debitNote.agreementId === this.agreement.id))
.subscribe(async (debitNote) => {
await this.addDebitNote(debitNote);
});
this.cleanup = () => {
invoiceSubscription.unsubscribe();
debitNoteSubscription.unsubscribe();
};
return this.cleanupSubscriptions !== null;
}

public stop(): void {
if (!this.isStarted()) {
throw new GolemUserError("Payment process not started");
}
this.cleanup?.();
this.cleanupSubscriptions();
}
}

0 comments on commit 4eb8a12

Please sign in to comment.