Skip to content

Commit

Permalink
feat(yagna): added behaviour subjects for events in yagna-api to supp…
Browse files Browse the repository at this point in the history
…ort basic event-bus cases

* refactor: switched the existing codebase to the subjects
* refactor: added graceful shutdown behaviour for yagna.disconnect
* chore: applied remark from CR
  • Loading branch information
grisha87 authored Apr 17, 2024
1 parent 71f462e commit a61115b
Show file tree
Hide file tree
Showing 12 changed files with 319 additions and 137 deletions.
13 changes: 7 additions & 6 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,12 @@
"generic-pool": "^3.9.0",
"ip-num": "^1.5.1",
"js-sha3": "^0.9.3",
"rxjs": "^7.8.1",
"semver": "^7.5.4",
"tmp": "^0.2.2",
"uuid": "^9.0.1",
"ws": "^8.16.0",
"ya-ts-client": "^1.0.0"
"ya-ts-client": "^1.1.1-beta.1"
},
"devDependencies": {
"@commitlint/cli": "^19.0.3",
Expand Down
1 change: 1 addition & 0 deletions src/activity/work/pool-old.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ export class ActivityPool {
this.paymentService.end(),
]);
await this.pool.clear();
await this.yagnaApi.disconnect();
this.events.emit("end");
}

Expand Down
32 changes: 9 additions & 23 deletions src/agreement/service.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import Bottleneck from "bottleneck";
import { Logger, YagnaApi, defaultLogger, sleep } from "../shared/utils";
import { defaultLogger, Logger, sleep, YagnaApi } from "../shared/utils";
import { Agreement, AgreementOptions } from "./agreement";
import { AgreementServiceConfig } from "./config";
import { GolemMarketError, MarketErrorCode, Proposal } from "../market";
import { MarketApi } from "ya-ts-client";

export interface AgreementDTO {
id: string;
Expand Down Expand Up @@ -231,30 +230,17 @@ export class AgreementPoolService {
}

private async subscribeForAgreementEvents() {
let afterTimestamp: string | undefined;
while (this.isServiceRunning) {
try {
const events = (await this.yagnaApi.market.collectAgreementEvents(
this.config.agreementEventsFetchingIntervalSec,
afterTimestamp,
this.config.agreementMaxEvents,
this.yagnaApi.appSessionId,
)) as Array<MarketApi.AgreementEventDTO & MarketApi.AgreementTerminatedEventDTO>;
events.forEach((event) => {
afterTimestamp = event.eventDate;
// @ts-expect-error: Bug in ya-tsclient: typo in eventtype #FIXME - report to core
if (event.eventtype === "AgreementTerminatedEvent") {
this.handleTerminationAgreementEvent(event.agreementId, event.reason);
}
});
} catch (error) {
this.logger.debug(`Unable to get agreement events.`, error);
await sleep(2);
this.yagnaApi.agreementEvents$.subscribe((event) => {
this.logger.debug("Received agreement operation event", { event });
if (event) {
if (event.eventType === "AgreementTerminatedEvent" && "reason" in event) {
this.handleTerminationAgreementEvent(event.agreementId, event.reason);
}
}
}
});
}

private async handleTerminationAgreementEvent(agreementId: string, reason?: { [key: string]: string }) {
private async handleTerminationAgreementEvent(agreementId: string, reason?: Record<string, string>) {
const agreement = this.agreements.get(agreementId);
if (agreement) {
await agreement.terminate(reason);
Expand Down
1 change: 1 addition & 0 deletions src/experimental/job/job_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ export class JobManager {
public async close() {
const pendingJobs = Array.from(this.jobs.values()).filter((job) => job.isRunning());
await Promise.allSettled(pendingJobs.map((job) => job.cancel()));
await this.yagna?.disconnect();
this.yagna = null;
}

Expand Down
1 change: 1 addition & 0 deletions src/golem-network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ export class GolemNetwork {
* @return Resolves when all shutdown steps are completed
*/
async disconnect() {
await this.yagna.disconnect();
this.events.emit("disconnected");
}

Expand Down
131 changes: 54 additions & 77 deletions src/payment/payments.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { BasePaymentOptions, PaymentConfig } from "./config";
import { Logger, sleep, YagnaApi } from "../shared/utils";
import { Logger, YagnaApi } from "../shared/utils";
import { Invoice } from "./invoice";
import { DebitNote } from "./debit_note";
import { GolemTimeoutError } from "../shared/error/golem-error";
import { EventEmitter } from "eventemitter3";
import { Subscription } from "rxjs";

export interface PaymentEvents {
invoiceReceived: (invoice: Invoice) => void;
Expand All @@ -19,13 +20,14 @@ export interface PaymentOptions extends BasePaymentOptions {
}

export class Payments {
private isRunning = true;
private options: PaymentConfig;
private logger: Logger;
private lastInvoiceFetchingTime: string = new Date().toISOString();
private lastDebitNotesFetchingTime: string = new Date().toISOString();

public readonly events = new EventEmitter<PaymentEvents>();

private debitNoteSubscription: Subscription | null = null;
private invoiceSubscription: Subscription | null = null;

static async create(yagnaApi: YagnaApi, options?: PaymentOptions) {
return new Payments(yagnaApi, new PaymentConfig(options));
}
Expand Down Expand Up @@ -54,91 +56,66 @@ export class Payments {
),
this.options.unsubscribeTimeoutMs,
);
this.events.on("unsubscribed", () => {
this.logger.debug(`Payments unsubscribed`);
clearTimeout(timeoutId);
resolve(true);
});
this.isRunning = false;

this.debitNoteSubscription?.unsubscribe();
this.invoiceSubscription?.unsubscribe();

clearTimeout(timeoutId);

resolve(true);
});
}

private async subscribe() {
this.subscribeForInvoices().catch((error) => this.logger.error(`Unable to collect invoices.`, { error }));
this.subscribeForDebitNotes().catch((error) => this.logger.error(`Unable to collect debit notes.`, { error }));
this.subscribeForInvoices();
this.subscribeForDebitNotes();
}

private async subscribeForInvoices() {
while (this.isRunning) {
try {
const invoiceEvents = await this.yagnaApi.payment.getInvoiceEvents(
this.options.invoiceFetchingInterval / 1000,
this.lastInvoiceFetchingTime,
this.options.maxInvoiceEvents,
this.yagnaApi.appSessionId,
);
for (const event of invoiceEvents) {
if (!this.isRunning) break;
if (event.eventType !== "InvoiceReceivedEvent") continue;
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore FIXME: ya-ts-client does not provide invoiceId in the event even though it is in the API response
const invoiceId = event["invoiceId"];
const invoice = await Invoice.create(invoiceId, this.yagnaApi, { ...this.options }).catch((error) =>
this.logger.error(`Unable to create invoice`, { id: invoiceId, error }),
);
if (!invoice) continue;
this.events.emit("invoiceReceived", invoice);
this.lastInvoiceFetchingTime = event.eventDate;
this.logger.debug(`New Invoice received`, {
id: invoice.id,
agreementId: invoice.agreementId,
amount: invoice.amount,
});
private subscribeForInvoices() {
this.invoiceSubscription = this.yagnaApi.invoiceEvents$.subscribe(async (event) => {
this.logger.debug("Received invoice event from Yagna", { event });

if (event && event.eventType === "InvoiceReceivedEvent") {
if (event.invoiceId) {
try {
const invoice = await Invoice.create(event.invoiceId, this.yagnaApi, { ...this.options });
this.events.emit("invoiceReceived", invoice);
this.logger.debug(`New Invoice received`, {
id: invoice.id,
agreementId: invoice.agreementId,
amount: invoice.amount,
});
} catch (err) {
this.logger.error(`Unable to create invoice`, { event, err });
}
} else {
this.logger.warn("Received invoice event without invoice ID", { event });
}
} catch (error) {
const reason = error.response?.data?.message || error.message || error;
this.logger.error(`Unable to get invoices.`, { reason });
await sleep(2);
}
}
this.events.emit("unsubscribed");
});
}

private async subscribeForDebitNotes() {
while (this.isRunning) {
try {
const debitNotesEvents = await this.yagnaApi.payment
.getDebitNoteEvents(
this.options.debitNotesFetchingInterval / 1000,
this.lastDebitNotesFetchingTime,
this.options.maxDebitNotesEvents,
this.yagnaApi.appSessionId,
)
.catch(() => []);

for (const event of debitNotesEvents) {
if (!this.isRunning) return;
if (event.eventType !== "DebitNoteReceivedEvent") continue;
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore FIXME: ya-ts-client does not provide debitNoteId in the event even though it is in the API response
const debitNoteId = event["debitNoteId"];
const debitNote = await DebitNote.create(debitNoteId, this.yagnaApi, { ...this.options }).catch((error) =>
this.logger.error(`Unable to create debit note`, { id: debitNoteId, error }),
);
if (!debitNote) continue;
this.events.emit("debitNoteReceived", debitNote);
this.lastDebitNotesFetchingTime = event.eventDate;
this.logger.debug("New Debit Note received", {
agreementId: debitNote.agreementId,
amount: debitNote.totalAmountDue,
});
private subscribeForDebitNotes() {
this.debitNoteSubscription = this.yagnaApi.debitNoteEvents$.subscribe(async (event) => {
this.logger.debug("Received debit note event from Yagna", { event });

if (event && event.eventType === "DebitNoteReceivedEvent") {
if (event.debitNoteId) {
try {
const debitNote = await DebitNote.create(event.debitNoteId, this.yagnaApi, { ...this.options });
this.events.emit("debitNoteReceived", debitNote);
this.logger.debug("New Debit Note received", {
agreementId: debitNote.agreementId,
amount: debitNote.totalAmountDue,
});
} catch (err) {
this.logger.error(`Unable to create debit note`, { event, err });
}
} else {
this.logger.warn("Received debit note event without debit note ID", { event });
}
} catch (error) {
const reason = error.response?.data?.message || error;
this.logger.error(`Unable to get debit notes.`, { reason });
await sleep(2);
}
}
});
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/shared/utils/index.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import sleep from "./sleep";

export { sleep };
export * as runtimeContextChecker from "./runtimeContextChecker";
export { Logger } from "./logger/logger";
export { nullLogger } from "./logger/nullLogger";
export { defaultLogger } from "./logger/defaultLogger";
export * as EnvUtils from "./env";
export { YagnaApi, YagnaOptions } from "./yagna/yagnaApi";
export { YagnaEventSubscription } from "./yagna/yagnaApi";
export { YagnaApi, YagnaOptions, YagnaEventSubscription } from "./yagna/yagnaApi";
40 changes: 40 additions & 0 deletions src/shared/utils/waitForCondition.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { clearInterval } from "node:timers";
import { GolemTimeoutError } from "../error/golem-error";

/**
* Utility function that helps to block the execution until a condition is met (check returns true) or the timeout happens.
*
* @param {function} check - The function checking if the condition is met.
* @param {Object} [opts] - Options controlling the timeout and check interval in seconds.
* @param {number} [opts.timeoutSeconds=15] - The timeout value in seconds. Default is 15 seconds.
* @param {number} [opts.intervalSeconds=1] - The interval between condition checks in seconds. Default is 1 second.
*
* @return {Promise<void>} - Resolves when the condition is met or rejects with a timeout error if it wasn't met on time.
*/
export function waitForCondition(
check: () => boolean,
opts = { timeoutSeconds: 15, intervalSeconds: 1 },
): Promise<void> {
let verifyInterval: NodeJS.Timeout | undefined;
let waitTimeout: NodeJS.Timeout | undefined;

const verify = new Promise<void>((resolve) => {
verifyInterval = setInterval(() => {
if (check()) {
clearInterval(verifyInterval);
resolve();
}
}, opts.intervalSeconds * 1000);
});

const wait = new Promise<void>((_, reject) => {
waitTimeout = setTimeout(() => {
reject(new GolemTimeoutError(`Condition was not met within ${opts.timeoutSeconds}s`));
}, opts.timeoutSeconds * 1000);
});

return Promise.race([verify, wait]).finally(() => {
clearInterval(verifyInterval);
clearTimeout(waitTimeout);
});
}
Loading

0 comments on commit a61115b

Please sign in to comment.