Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into beta
Browse files Browse the repository at this point in the history
  • Loading branch information
grisha87 committed Dec 18, 2023
2 parents 2323dd9 + 1175b1c commit 0fd595a
Show file tree
Hide file tree
Showing 9 changed files with 196 additions and 16 deletions.
10 changes: 6 additions & 4 deletions src/agreement/agreement.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,13 @@ export class Agreement {
/**
* Confirm agreement and waits for provider approval
* @description Blocking function waits till agreement will be confirmed and approved by provider
* @throws Error if the agreement will be rejected by provider or failed to confirm
*
* @param appSessionId - Optional correlation/session identifier used for querying events
* related to this agreement
*/
async confirm() {
async confirm(appSessionId?: string) {
try {
await this.yagnaApi.market.confirmAgreement(this.id);
await this.yagnaApi.market.confirmAgreement(this.id, appSessionId);
await this.yagnaApi.market.waitForApproval(this.id, this.options.agreementWaitingForApprovalTimeout);
this.logger?.debug(`Agreement ${this.id} approved`);
this.options.eventTarget?.dispatchEvent(
Expand Down Expand Up @@ -144,7 +146,7 @@ export class Agreement {
timeout: this.options.agreementRequestTimeout,
});
this.options.eventTarget?.dispatchEvent(
new Events.AgreementTerminated({ id: this.id, providerId: this.provider.id }),
new Events.AgreementTerminated({ id: this.id, providerId: this.provider.id, reason: reason.message }),
);
this.logger?.debug(`Agreement ${this.id} terminated`);
} catch (error) {
Expand Down
7 changes: 7 additions & 0 deletions src/agreement/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ const DEFAULTS = {
agreementRequestTimeout: 30000,
agreementWaitingForApprovalTimeout: 60,
agreementSelector: randomAgreementSelectorWithPriorityForExistingOnes(),
agreementMaxEvents: 100,
agreementEventsFetchingIntervalSec: 5,
};

/**
Expand All @@ -32,9 +34,14 @@ export class AgreementConfig {
*/
export class AgreementServiceConfig extends AgreementConfig {
readonly agreementSelector: AgreementSelector;
readonly agreementMaxEvents: number;
readonly agreementEventsFetchingIntervalSec: number;

constructor(options?: AgreementServiceOptions) {
super(options);
this.agreementSelector = options?.agreementSelector ?? DEFAULTS.agreementSelector;
this.agreementMaxEvents = options?.agreementMaxEvents ?? DEFAULTS.agreementMaxEvents;
this.agreementEventsFetchingIntervalSec =
options?.agreementEventsFetchingIntervalSec ?? DEFAULTS.agreementEventsFetchingIntervalSec;
}
}
63 changes: 54 additions & 9 deletions src/agreement/service.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import Bottleneck from "bottleneck";
import { Logger } from "../utils";
import { Logger, YagnaApi, sleep } from "../utils";
import { Agreement, AgreementOptions, AgreementStateEnum } from "./agreement";
import { AgreementServiceConfig } from "./config";
import { Proposal } from "../market";
import sleep from "../utils/sleep";
import { YagnaApi } from "../utils/yagna/yagna";
import { AgreementEvent, AgreementTerminatedEvent } from "ya-ts-client/dist/ya-market";
import { GolemError } from "../error/golem-error";

export interface AgreementDTO {
Expand All @@ -22,6 +21,10 @@ export type AgreementSelector = (candidates: AgreementCandidate[]) => Promise<Ag
export interface AgreementServiceOptions extends AgreementOptions {
/** The selector used when choosing a provider from a pool of existing offers (from the market or already used before) */
agreementSelector?: AgreementSelector;
/** The maximum number of events fetched in one request call */
agreementMaxEvents?: number;
/** interval for fetching agreement events */
agreementEventsFetchingIntervalSec?: number;
}

/**
Expand All @@ -32,11 +35,9 @@ export interface AgreementServiceOptions extends AgreementOptions {
export class AgreementPoolService {
private logger?: Logger;
private config: AgreementServiceConfig;

private pool = new Set<AgreementCandidate>();
private candidateMap = new Map<string, AgreementCandidate>();
private agreements = new Map<string, Agreement>();

private isServiceRunning = false;
private limiter: Bottleneck;

Expand All @@ -46,7 +47,6 @@ export class AgreementPoolService {
) {
this.config = new AgreementServiceConfig(agreementServiceOptions);
this.logger = agreementServiceOptions?.logger;

this.limiter = new Bottleneck({
maxConcurrent: 1,
});
Expand All @@ -57,6 +57,7 @@ export class AgreementPoolService {
*/
async run() {
this.isServiceRunning = true;
this.subscribeForAgreementEvents().catch(this.logger?.warn);
this.logger?.debug("Agreement Pool Service has started");
}

Expand All @@ -83,16 +84,17 @@ export class AgreementPoolService {
this.logger?.debug(`Agreement ${agreementId} has been released for reuse`);
return;
} else {
this.logger?.warn(`Agreement ${agreementId} not found in the pool`);
this.logger?.debug(`Agreement ${agreementId} not found in the pool`);
}
} else {
const agreement = this.agreements.get(agreementId);
if (!agreement) {
this.logger?.warn(`Agreement ${agreementId} not found in the pool`);
this.logger?.debug(`Agreement ${agreementId} not found in the pool`);
return;
}
this.logger?.debug(`Agreement ${agreementId} has been released and will be terminated`);
try {
this.removeAgreementFromPool(agreement);
await agreement.terminate();
} catch (e) {
this.logger?.warn(`Unable to terminate agreement ${agreement.id}: ${e.message}`);
Expand Down Expand Up @@ -204,11 +206,54 @@ export class AgreementPoolService {
const state = await agreement.getState();

if (state === AgreementStateEnum.Proposal) {
await agreement.confirm();
await agreement.confirm(this.yagnaApi.appSessionId);
this.logger?.debug(`Agreement proposed to provider '${agreement.provider.name}'`);
}

await this.yagnaApi.market.waitForApproval(agreement.id, this.config.agreementWaitingForApprovalTimeout);
return agreement;
}

private async subscribeForAgreementEvents() {
let afterTimestamp: string | undefined;
while (this.isServiceRunning) {
try {
// @ts-expect-error Bug in ts-client typing
const { data: events }: { data: Array<AgreementEvent & AgreementTerminatedEvent> } =
await this.yagnaApi.market.collectAgreementEvents(
this.config.agreementEventsFetchingIntervalSec,
afterTimestamp,
this.config.agreementMaxEvents,
this.yagnaApi.appSessionId,
);
events.forEach((event) => {
afterTimestamp = event.eventDate;
// @ts-expect-error: Bug in ya-tsclient: typo in eventtype
if (event.eventtype === "AgreementTerminatedEvent") {
this.handleTerminationAgreementEvent(event.agreementId, event.reason);
}
});
} catch (error) {
this.logger?.debug(`Unable to get agreement events. ${error}`);
await sleep(2);
}
}
}

private async handleTerminationAgreementEvent(agreementId: string, reason?: { [key: string]: string }) {
const agreement = this.agreements.get(agreementId);
if (agreement) {
await agreement.terminate(reason);
this.removeAgreementFromPool(agreement);
}
}

private removeAgreementFromPool(agreement: Agreement) {
this.agreements.delete(agreement.id);
const candidate = this.candidateMap.get(agreement.id);
if (candidate) {
this.pool.delete(candidate);
this.candidateMap.delete(agreement.id);
}
}
}
5 changes: 3 additions & 2 deletions src/payment/payments.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ export class Payments extends EventTarget {
this.options.invoiceFetchingInterval / 1000,
this.lastInvoiceFetchingTime,
this.options.maxInvoiceEvents,
undefined,
this.yagnaApi.appSessionId,
{ timeout: 0 },
);
for (const event of invoiceEvents) {
Expand Down Expand Up @@ -91,10 +91,11 @@ export class Payments extends EventTarget {
this.options.debitNotesFetchingInterval / 1000,
this.lastDebitNotesFetchingTime,
this.options.maxDebitNotesEvents,
undefined,
this.yagnaApi.appSessionId,
{ timeout: 0 },
)
.catch(() => ({ data: [] }));

for (const event of debitNotesEvents) {
if (!this.isRunning) return;
if (event.eventType !== "DebitNoteReceivedEvent") continue;
Expand Down
6 changes: 6 additions & 0 deletions src/payment/rejection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ export enum RejectionReason {
UnsolicitedService = "UNSOLICITED_SERVICE",
BadService = "BAD_SERVICE",
IncorrectAmount = "INCORRECT_AMOUNT",
/**
* We might get a debit note related to an agreement which is already covered with
* a final invoice. In such cases we don't want to pay for the debit note,
* as the payment will be already made when we accept the invoice.
*/
NonPayableAgreement = "NON_PAYABLE_AGREEMENT",
}

/**
Expand Down
15 changes: 15 additions & 0 deletions src/payment/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,21 @@ export class PaymentService {
private async processDebitNote(debitNote: DebitNote) {
try {
if (this.paidDebitNotes.has(debitNote.id)) return;

if (!this.agreementsToPay.has(debitNote.agreementId)) {
const reason = {
rejectionReason: RejectionReason.NonPayableAgreement,
totalAmountAccepted: "0",
message:
"DebitNote rejected because the agreement is already covered with a final invoice that should be paid instead of the debit note",
};
await debitNote.reject(reason);
this.logger?.warn(
`DebitNote has been rejected for agreement ${debitNote.agreementId}. Reason: ${reason.message}`,
);
return;
}

if (await this.config.debitNoteFilter(debitNote.dto)) {
await debitNote.accept(debitNote.totalAmountDue, this.allocation!.id);
this.paidDebitNotes.add(debitNote.id);
Expand Down
3 changes: 3 additions & 0 deletions src/utils/yagna/yagna.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { Agent } from "http";
import { Configuration } from "ya-ts-client/dist/ya-payment";
import * as EnvUtils from "../env";
import { GolemError } from "../../error/golem-error";
import { v4 } from "uuid";

export type YagnaApi = {
market: MarketRequestorApi;
Expand All @@ -17,6 +18,7 @@ export type YagnaApi = {
identity: IdentityRequestorApi;
gsb: GsbRequestorApi;
yagnaOptions: YagnaOptions;
appSessionId: string;
};

export type YagnaOptions = {
Expand Down Expand Up @@ -70,6 +72,7 @@ export class Yagna {
apiKey: this.apiKey,
basePath: this.apiBaseUrl,
},
appSessionId: v4(),
};
this.addErrorHandler(api);
return api;
Expand Down
72 changes: 71 additions & 1 deletion tests/e2e/tasks.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { LoggerMock } from "../mock";
import { readFileSync } from "fs";
import { Result, TaskExecutor } from "../../src";
import { TaskExecutor, EVENT_TYPE, BaseEvent, Events } from "../../src";
const logger = new LoggerMock(false);

describe("Task Executor", function () {
Expand Down Expand Up @@ -222,4 +222,74 @@ describe("Task Executor", function () {
}
expect(logger.logs).not.toContain("Trying to redo the task");
});

/**
* TODO:
* For the test to work properly, the midAgreementDebitNoteIntervalSec parameter (which is in the beta version) is needed, so we temporarily skip this test
*/
it.skip("should clean up the agreements in the pool if the agreement has been terminated by provider", async () => {
const eventTarget = new EventTarget();
const executor = await TaskExecutor.create({
package: "golem/alpine:latest",
eventTarget,
// we set mid-agreement payment and a filter that will not pay for debit notes
// which should result in termination of the agreement by provider
debitNotesFilter: () => Promise.resolve(false),
debitNotesAcceptanceTimeoutSec: 10,
midAgreementPaymentTimeoutSec: 10,
});
let createdAgreementsCount = 0;
eventTarget.addEventListener(EVENT_TYPE, (event) => {
const ev = event as BaseEvent<unknown>;
if (ev instanceof Events.AgreementCreated) createdAgreementsCount++;
});
try {
await executor.run(async (ctx) => {
const proc = await ctx.spawn("timeout 15 ping 127.0.0.1");
proc.stdout.on("data", (data) => console.log(data));
return await proc.waitForExit(20_000);
});
// the first task should be terminated by the provider, the second one should not use the same agreement
await executor.run(async (ctx) => console.log((await ctx.run("echo 'Hello World'")).stdout));
} catch (error) {
throw new Error(`Test failed. ${error}`);
} finally {
await executor.shutdown();
}
expect(createdAgreementsCount).toBeGreaterThan(1);
});

it("should only accept debit notes for agreements that were created by the executor", async () => {
const eventTarget1 = new EventTarget();
const eventTarget2 = new EventTarget();
const executor1 = await TaskExecutor.create("golem/alpine:latest");
const executor2 = await TaskExecutor.create("golem/alpine:latest");
const createdAgreementsIds1 = new Set();
const createdAgreementsIds2 = new Set();
const acceptedDebitNoteAgreementIds1 = new Set();
const acceptedDebitNoteAgreementIds2 = new Set();
eventTarget1.addEventListener(EVENT_TYPE, (event) => {
const ev = event as BaseEvent<unknown>;
if (ev instanceof Events.AgreementCreated) createdAgreementsIds1.add(ev.detail.id);
if (ev instanceof Events.DebitNoteAccepted) acceptedDebitNoteAgreementIds1.add(ev.detail.agreementId);
});
eventTarget2.addEventListener(EVENT_TYPE, (event) => {
const ev = event as BaseEvent<unknown>;
if (ev instanceof Events.AgreementCreated) createdAgreementsIds2.add(ev.detail.id);
if (ev instanceof Events.DebitNoteAccepted) acceptedDebitNoteAgreementIds2.add(ev.detail.agreementId);
});
try {
await Promise.all([
executor1.run(async (ctx) => console.log((await ctx.run("echo 'Executor 1'")).stdout)),
executor2.run(async (ctx) => console.log((await ctx.run("echo 'Executor 2'")).stdout)),
]);
} catch (error) {
throw new Error(`Test failed. ${error}`);
} finally {
await executor1.shutdown();
await executor2.shutdown();
}
expect(acceptedDebitNoteAgreementIds1).toEqual(createdAgreementsIds1);
expect(acceptedDebitNoteAgreementIds2).toEqual(createdAgreementsIds2);
});
});
Loading

0 comments on commit 0fd595a

Please sign in to comment.