diff --git a/src/agreement/agreement.ts b/src/agreement/agreement.ts index 0587f43f9..231e5fbf9 100644 --- a/src/agreement/agreement.ts +++ b/src/agreement/agreement.ts @@ -101,11 +101,13 @@ export class Agreement { /** * Confirm agreement and waits for provider approval * @description Blocking function waits till agreement will be confirmed and approved by provider - * @throws Error if the agreement will be rejected by provider or failed to confirm + * + * @param appSessionId - Optional correlation/session identifier used for querying events + * related to this agreement */ - async confirm() { + async confirm(appSessionId?: string) { try { - await this.yagnaApi.market.confirmAgreement(this.id); + await this.yagnaApi.market.confirmAgreement(this.id, appSessionId); await this.yagnaApi.market.waitForApproval(this.id, this.options.agreementWaitingForApprovalTimeout); this.logger?.debug(`Agreement ${this.id} approved`); this.options.eventTarget?.dispatchEvent( @@ -144,7 +146,7 @@ export class Agreement { timeout: this.options.agreementRequestTimeout, }); this.options.eventTarget?.dispatchEvent( - new Events.AgreementTerminated({ id: this.id, providerId: this.provider.id }), + new Events.AgreementTerminated({ id: this.id, providerId: this.provider.id, reason: reason.message }), ); this.logger?.debug(`Agreement ${this.id} terminated`); } catch (error) { diff --git a/src/agreement/config.ts b/src/agreement/config.ts index 415a8cad0..ba57dba80 100644 --- a/src/agreement/config.ts +++ b/src/agreement/config.ts @@ -7,6 +7,8 @@ const DEFAULTS = { agreementRequestTimeout: 30000, agreementWaitingForApprovalTimeout: 60, agreementSelector: randomAgreementSelectorWithPriorityForExistingOnes(), + agreementMaxEvents: 100, + agreementEventsFetchingIntervalSec: 5, }; /** @@ -32,9 +34,14 @@ export class AgreementConfig { */ export class AgreementServiceConfig extends AgreementConfig { readonly agreementSelector: AgreementSelector; + readonly agreementMaxEvents: number; + readonly agreementEventsFetchingIntervalSec: number; constructor(options?: AgreementServiceOptions) { super(options); this.agreementSelector = options?.agreementSelector ?? DEFAULTS.agreementSelector; + this.agreementMaxEvents = options?.agreementMaxEvents ?? DEFAULTS.agreementMaxEvents; + this.agreementEventsFetchingIntervalSec = + options?.agreementEventsFetchingIntervalSec ?? DEFAULTS.agreementEventsFetchingIntervalSec; } } diff --git a/src/agreement/service.ts b/src/agreement/service.ts index b4e9466ea..1ccf884b4 100644 --- a/src/agreement/service.ts +++ b/src/agreement/service.ts @@ -1,10 +1,9 @@ import Bottleneck from "bottleneck"; -import { Logger } from "../utils"; +import { Logger, YagnaApi, sleep } from "../utils"; import { Agreement, AgreementOptions, AgreementStateEnum } from "./agreement"; import { AgreementServiceConfig } from "./config"; import { Proposal } from "../market"; -import sleep from "../utils/sleep"; -import { YagnaApi } from "../utils/yagna/yagna"; +import { AgreementEvent, AgreementTerminatedEvent } from "ya-ts-client/dist/ya-market"; import { GolemError } from "../error/golem-error"; export interface AgreementDTO { @@ -22,6 +21,10 @@ export type AgreementSelector = (candidates: AgreementCandidate[]) => Promise(); private candidateMap = new Map(); private agreements = new Map(); - private isServiceRunning = false; private limiter: Bottleneck; @@ -46,7 +47,6 @@ export class AgreementPoolService { ) { this.config = new AgreementServiceConfig(agreementServiceOptions); this.logger = agreementServiceOptions?.logger; - this.limiter = new Bottleneck({ maxConcurrent: 1, }); @@ -57,6 +57,7 @@ export class AgreementPoolService { */ async run() { this.isServiceRunning = true; + this.subscribeForAgreementEvents().catch(this.logger?.warn); this.logger?.debug("Agreement Pool Service has started"); } @@ -83,16 +84,17 @@ export class AgreementPoolService { this.logger?.debug(`Agreement ${agreementId} has been released for reuse`); return; } else { - this.logger?.warn(`Agreement ${agreementId} not found in the pool`); + this.logger?.debug(`Agreement ${agreementId} not found in the pool`); } } else { const agreement = this.agreements.get(agreementId); if (!agreement) { - this.logger?.warn(`Agreement ${agreementId} not found in the pool`); + this.logger?.debug(`Agreement ${agreementId} not found in the pool`); return; } this.logger?.debug(`Agreement ${agreementId} has been released and will be terminated`); try { + this.removeAgreementFromPool(agreement); await agreement.terminate(); } catch (e) { this.logger?.warn(`Unable to terminate agreement ${agreement.id}: ${e.message}`); @@ -204,11 +206,54 @@ export class AgreementPoolService { const state = await agreement.getState(); if (state === AgreementStateEnum.Proposal) { - await agreement.confirm(); + await agreement.confirm(this.yagnaApi.appSessionId); this.logger?.debug(`Agreement proposed to provider '${agreement.provider.name}'`); } await this.yagnaApi.market.waitForApproval(agreement.id, this.config.agreementWaitingForApprovalTimeout); return agreement; } + + private async subscribeForAgreementEvents() { + let afterTimestamp: string | undefined; + while (this.isServiceRunning) { + try { + // @ts-expect-error Bug in ts-client typing + const { data: events }: { data: Array } = + await this.yagnaApi.market.collectAgreementEvents( + this.config.agreementEventsFetchingIntervalSec, + afterTimestamp, + this.config.agreementMaxEvents, + this.yagnaApi.appSessionId, + ); + events.forEach((event) => { + afterTimestamp = event.eventDate; + // @ts-expect-error: Bug in ya-tsclient: typo in eventtype + if (event.eventtype === "AgreementTerminatedEvent") { + this.handleTerminationAgreementEvent(event.agreementId, event.reason); + } + }); + } catch (error) { + this.logger?.debug(`Unable to get agreement events. ${error}`); + await sleep(2); + } + } + } + + private async handleTerminationAgreementEvent(agreementId: string, reason?: { [key: string]: string }) { + const agreement = this.agreements.get(agreementId); + if (agreement) { + await agreement.terminate(reason); + this.removeAgreementFromPool(agreement); + } + } + + private removeAgreementFromPool(agreement: Agreement) { + this.agreements.delete(agreement.id); + const candidate = this.candidateMap.get(agreement.id); + if (candidate) { + this.pool.delete(candidate); + this.candidateMap.delete(agreement.id); + } + } } diff --git a/src/payment/payments.ts b/src/payment/payments.ts index 45d2c889e..dfdf8b3b5 100644 --- a/src/payment/payments.ts +++ b/src/payment/payments.ts @@ -57,7 +57,7 @@ export class Payments extends EventTarget { this.options.invoiceFetchingInterval / 1000, this.lastInvoiceFetchingTime, this.options.maxInvoiceEvents, - undefined, + this.yagnaApi.appSessionId, { timeout: 0 }, ); for (const event of invoiceEvents) { @@ -91,10 +91,11 @@ export class Payments extends EventTarget { this.options.debitNotesFetchingInterval / 1000, this.lastDebitNotesFetchingTime, this.options.maxDebitNotesEvents, - undefined, + this.yagnaApi.appSessionId, { timeout: 0 }, ) .catch(() => ({ data: [] })); + for (const event of debitNotesEvents) { if (!this.isRunning) return; if (event.eventType !== "DebitNoteReceivedEvent") continue; diff --git a/src/payment/rejection.ts b/src/payment/rejection.ts index 33483a84f..30b9e2e11 100644 --- a/src/payment/rejection.ts +++ b/src/payment/rejection.ts @@ -5,6 +5,12 @@ export enum RejectionReason { UnsolicitedService = "UNSOLICITED_SERVICE", BadService = "BAD_SERVICE", IncorrectAmount = "INCORRECT_AMOUNT", + /** + * We might get a debit note related to an agreement which is already covered with + * a final invoice. In such cases we don't want to pay for the debit note, + * as the payment will be already made when we accept the invoice. + */ + NonPayableAgreement = "NON_PAYABLE_AGREEMENT", } /** diff --git a/src/payment/service.ts b/src/payment/service.ts index 92d164dae..40b69dde8 100644 --- a/src/payment/service.ts +++ b/src/payment/service.ts @@ -146,6 +146,21 @@ export class PaymentService { private async processDebitNote(debitNote: DebitNote) { try { if (this.paidDebitNotes.has(debitNote.id)) return; + + if (!this.agreementsToPay.has(debitNote.agreementId)) { + const reason = { + rejectionReason: RejectionReason.NonPayableAgreement, + totalAmountAccepted: "0", + message: + "DebitNote rejected because the agreement is already covered with a final invoice that should be paid instead of the debit note", + }; + await debitNote.reject(reason); + this.logger?.warn( + `DebitNote has been rejected for agreement ${debitNote.agreementId}. Reason: ${reason.message}`, + ); + return; + } + if (await this.config.debitNoteFilter(debitNote.dto)) { await debitNote.accept(debitNote.totalAmountDue, this.allocation!.id); this.paidDebitNotes.add(debitNote.id); diff --git a/src/utils/yagna/yagna.ts b/src/utils/yagna/yagna.ts index 9f4cff861..ca075cdf6 100644 --- a/src/utils/yagna/yagna.ts +++ b/src/utils/yagna/yagna.ts @@ -8,6 +8,7 @@ import { Agent } from "http"; import { Configuration } from "ya-ts-client/dist/ya-payment"; import * as EnvUtils from "../env"; import { GolemError } from "../../error/golem-error"; +import { v4 } from "uuid"; export type YagnaApi = { market: MarketRequestorApi; @@ -17,6 +18,7 @@ export type YagnaApi = { identity: IdentityRequestorApi; gsb: GsbRequestorApi; yagnaOptions: YagnaOptions; + appSessionId: string; }; export type YagnaOptions = { @@ -70,6 +72,7 @@ export class Yagna { apiKey: this.apiKey, basePath: this.apiBaseUrl, }, + appSessionId: v4(), }; this.addErrorHandler(api); return api; diff --git a/tests/e2e/tasks.spec.ts b/tests/e2e/tasks.spec.ts index d3b9081f8..4272c36b4 100644 --- a/tests/e2e/tasks.spec.ts +++ b/tests/e2e/tasks.spec.ts @@ -1,6 +1,6 @@ import { LoggerMock } from "../mock"; import { readFileSync } from "fs"; -import { Result, TaskExecutor } from "../../src"; +import { TaskExecutor, EVENT_TYPE, BaseEvent, Events } from "../../src"; const logger = new LoggerMock(false); describe("Task Executor", function () { @@ -222,4 +222,74 @@ describe("Task Executor", function () { } expect(logger.logs).not.toContain("Trying to redo the task"); }); + + /** + * TODO: + * For the test to work properly, the midAgreementDebitNoteIntervalSec parameter (which is in the beta version) is needed, so we temporarily skip this test + */ + it.skip("should clean up the agreements in the pool if the agreement has been terminated by provider", async () => { + const eventTarget = new EventTarget(); + const executor = await TaskExecutor.create({ + package: "golem/alpine:latest", + eventTarget, + // we set mid-agreement payment and a filter that will not pay for debit notes + // which should result in termination of the agreement by provider + debitNotesFilter: () => Promise.resolve(false), + debitNotesAcceptanceTimeoutSec: 10, + midAgreementPaymentTimeoutSec: 10, + }); + let createdAgreementsCount = 0; + eventTarget.addEventListener(EVENT_TYPE, (event) => { + const ev = event as BaseEvent; + if (ev instanceof Events.AgreementCreated) createdAgreementsCount++; + }); + try { + await executor.run(async (ctx) => { + const proc = await ctx.spawn("timeout 15 ping 127.0.0.1"); + proc.stdout.on("data", (data) => console.log(data)); + return await proc.waitForExit(20_000); + }); + // the first task should be terminated by the provider, the second one should not use the same agreement + await executor.run(async (ctx) => console.log((await ctx.run("echo 'Hello World'")).stdout)); + } catch (error) { + throw new Error(`Test failed. ${error}`); + } finally { + await executor.shutdown(); + } + expect(createdAgreementsCount).toBeGreaterThan(1); + }); + + it("should only accept debit notes for agreements that were created by the executor", async () => { + const eventTarget1 = new EventTarget(); + const eventTarget2 = new EventTarget(); + const executor1 = await TaskExecutor.create("golem/alpine:latest"); + const executor2 = await TaskExecutor.create("golem/alpine:latest"); + const createdAgreementsIds1 = new Set(); + const createdAgreementsIds2 = new Set(); + const acceptedDebitNoteAgreementIds1 = new Set(); + const acceptedDebitNoteAgreementIds2 = new Set(); + eventTarget1.addEventListener(EVENT_TYPE, (event) => { + const ev = event as BaseEvent; + if (ev instanceof Events.AgreementCreated) createdAgreementsIds1.add(ev.detail.id); + if (ev instanceof Events.DebitNoteAccepted) acceptedDebitNoteAgreementIds1.add(ev.detail.agreementId); + }); + eventTarget2.addEventListener(EVENT_TYPE, (event) => { + const ev = event as BaseEvent; + if (ev instanceof Events.AgreementCreated) createdAgreementsIds2.add(ev.detail.id); + if (ev instanceof Events.DebitNoteAccepted) acceptedDebitNoteAgreementIds2.add(ev.detail.agreementId); + }); + try { + await Promise.all([ + executor1.run(async (ctx) => console.log((await ctx.run("echo 'Executor 1'")).stdout)), + executor2.run(async (ctx) => console.log((await ctx.run("echo 'Executor 2'")).stdout)), + ]); + } catch (error) { + throw new Error(`Test failed. ${error}`); + } finally { + await executor1.shutdown(); + await executor2.shutdown(); + } + expect(acceptedDebitNoteAgreementIds1).toEqual(createdAgreementsIds1); + expect(acceptedDebitNoteAgreementIds2).toEqual(createdAgreementsIds2); + }); }); diff --git a/tests/unit/payment_service.test.ts b/tests/unit/payment_service.test.ts index d520a9083..01060e12b 100644 --- a/tests/unit/payment_service.test.ts +++ b/tests/unit/payment_service.test.ts @@ -46,14 +46,21 @@ describe("Payment Service", () => { await paymentService.end(); }); + /** + * service.end() waits for invoices to be paid, in unit-tests that should be below 5s + */ + const TEST_PAYMENT_TIMEOUT_MS = 1000; + it("should accept and process debit note for agreement", async () => { const paymentService = new PaymentService(yagnaApi, { logger, + paymentTimeout: TEST_PAYMENT_TIMEOUT_MS, }); setExpectedEvents(debitNotesEvents); setExpectedDebitNotes(debitNotes); await paymentService.createAllocation(); await paymentService.run(); + await paymentService.acceptPayments(agreement); await paymentService.acceptDebitNotes(agreement.id); await logger.expectToInclude(`Debit Note accepted for agreement ${agreement.id}`, 100); await paymentService.end(); @@ -64,11 +71,13 @@ describe("Payment Service", () => { const paymentService = new PaymentService(yagnaApi, { logger, debitNotesFilter: alwaysRejectDebitNoteFilter, + paymentTimeout: TEST_PAYMENT_TIMEOUT_MS, }); setExpectedEvents(debitNotesEvents); setExpectedDebitNotes(debitNotes); await paymentService.createAllocation(); await paymentService.run(); + await paymentService.acceptPayments(agreement); await paymentService.acceptDebitNotes(agreement.id); await logger.expectToInclude( `DebitNote has been rejected for agreement ${agreement.id}. Reason: DebitNote rejected by DebitNote Filter`, @@ -77,6 +86,24 @@ describe("Payment Service", () => { await paymentService.end(); }); + it("should reject a debit note when the agreement is already covered with a final invoice", async () => { + const paymentService = new PaymentService(yagnaApi, { + logger, + }); + + setExpectedEvents([...invoiceEvents, ...debitNotesEvents]); + setExpectedDebitNotes(debitNotes); + setExpectedInvoices(invoices); + await paymentService.createAllocation(); + await paymentService.run(); + await paymentService.acceptDebitNotes(agreement.id); + await logger.expectToInclude( + `DebitNote has been rejected for agreement ${agreement.id}. Reason: DebitNote rejected because the agreement is already covered with a final invoice that should be paid instead of the debit note`, + 100, + ); + await paymentService.end(); + }); + it("should reject when invoice rejected by Invoice Filter", async () => { const alwaysRejectInvoiceFilter = async () => false; const paymentService = new PaymentService(yagnaApi, { @@ -100,11 +127,13 @@ describe("Payment Service", () => { const paymentService = new PaymentService(yagnaApi, { logger, debitNotesFilter: PaymentFilters.acceptMaxAmountDebitNoteFilter(0.00001), + paymentTimeout: TEST_PAYMENT_TIMEOUT_MS, }); setExpectedEvents(debitNotesEvents); setExpectedDebitNotes(debitNotes); await paymentService.createAllocation(); await paymentService.run(); + await paymentService.acceptPayments(agreement); await paymentService.acceptDebitNotes(agreement.id); await logger.expectToInclude( `DebitNote has been rejected for agreement ${agreement.id}. Reason: DebitNote rejected by DebitNote Filter`, @@ -135,11 +164,13 @@ describe("Payment Service", () => { const paymentService = new PaymentService(yagnaApi, { logger, debitNotesFilter: PaymentFilters.acceptMaxAmountDebitNoteFilter(7), + paymentTimeout: TEST_PAYMENT_TIMEOUT_MS, }); setExpectedEvents(debitNotesEvents); setExpectedDebitNotes(debitNotes); await paymentService.createAllocation(); await paymentService.run(); + await paymentService.acceptPayments(agreement); await paymentService.acceptDebitNotes(agreement.id); await logger.expectToInclude(`Debit Note accepted for agreement ${agreement.id}`, 100); await paymentService.end();