Skip to content

Commit

Permalink
Merge pull request #988 from golemfactory/feature/JST-996/shorten-glm…
Browse files Browse the repository at this point in the history
…-disconnect

Speed up `glm.disconnect`
  • Loading branch information
SewerynKras authored Jun 21, 2024
2 parents 0909f3b + d13da74 commit eb9a4df
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 38 deletions.
1 change: 0 additions & 1 deletion src/golem-network/golem-network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,6 @@ export class GolemNetwork {
async disconnect() {
await Promise.allSettled(this.cleanupTasks.map((task) => task()));
await this.storageProvider.close();
await this.services.paymentApi.disconnect();
await this.yagna.disconnect();

this.services.proposalCache.flushAll();
Expand Down
3 changes: 0 additions & 3 deletions src/payment/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ export interface IPaymentApi {
/** Starts the reader logic */
connect(): Promise<void>;

/** Terminates the reader logic */
disconnect(): Promise<void>;

getInvoice(id: string): Promise<Invoice>;

acceptInvoice(invoice: Invoice, allocation: Allocation, amount: string): Promise<Invoice>;
Expand Down
10 changes: 6 additions & 4 deletions src/shared/utils/wait.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import { GolemTimeoutError } from "../error/golem-error";
*/
export function waitForCondition(
check: () => boolean | Promise<boolean>,
opts = { timeoutSeconds: 30, intervalSeconds: 1 },
opts?: { timeoutSeconds?: number; intervalSeconds?: number },
): Promise<void> {
const timeoutSeconds = opts?.timeoutSeconds ?? 30;
const intervalSeconds = opts?.intervalSeconds ?? 1;
let verifyInterval: NodeJS.Timeout | undefined;
let waitTimeout: NodeJS.Timeout | undefined;

Expand All @@ -22,13 +24,13 @@ export function waitForCondition(
if (await check()) {
resolve();
}
}, opts.intervalSeconds * 1000);
}, intervalSeconds * 1000);
});

const wait = new Promise<void>((_, reject) => {
waitTimeout = setTimeout(() => {
reject(new GolemTimeoutError(`Condition was not met within ${opts.timeoutSeconds}s`));
}, opts.timeoutSeconds * 1000);
reject(new GolemTimeoutError(`Condition was not met within ${timeoutSeconds}s`));
}, timeoutSeconds * 1000);
});

return Promise.race([verify, wait]).finally(() => {
Expand Down
5 changes: 0 additions & 5 deletions src/shared/yagna/adapters/payment-api-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,6 @@ export class PaymentApiAdapter implements IPaymentApi {
return this.debitNoteRepo.getById(id);
}

async disconnect() {
this.logger.debug("Disconnecting Payment API Adapter");
this.logger.debug("Payment API Adapter disconnected");
}

async acceptInvoice(invoice: Invoice, allocation: Allocation, amount: string): Promise<Invoice> {
try {
await this.yagna.payment.acceptInvoice(invoice.id, {
Expand Down
40 changes: 21 additions & 19 deletions src/shared/yagna/event-reader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,22 @@ export type CancellablePoll<T> = {
/** User defined name of the event stream for ease of debugging */
eventType: string;

/** Tells if a poll call is currently active - reader */
isBusy: boolean;

/** Tells if the poll is active in general. If it's 'false' it means that the poll was cancelled and no polling attempts will be done any more */
isOnline: boolean;
/** Flag indicating if the reader is finished and no longer polling */
isFinished: boolean;

/** Triggers the poll using the fetcher provided when the CancellablePoll was created */
pollValues: () => AsyncGenerator<T>;

/**
* Cancels the polling operations, stopping the reader
*
* It will wait for the last read to complete and will take the reader offline
*/
cancel: () => Promise<void>;
};

export type EventsFetcherWithCursor<T extends EventDTO> = (lastEventTimestamp: string) => Promise<T[]>;
type CancellablePromise<T> = Promise<T> & { cancel: () => void };
export type CancellableEventsFetcherWithCursor<T extends EventDTO> = (
lastEventTimestamp: string,
) => CancellablePromise<T[]>;

export class EventReader {
public constructor(private readonly logger: Logger) {}
Expand All @@ -39,24 +37,23 @@ export class EventReader {

public createReader<T extends EventDTO>(
eventType: string,
eventsFetcher: EventsFetcherWithCursor<T>,
eventsFetcher: CancellableEventsFetcherWithCursor<T>,
): CancellablePoll<T> {
let isBusy = false;
let isOnline = true;
let isFinished = false;
let keepReading = true;
let currentPoll: CancellablePromise<T[]> | null = null;
let lastTimestamp = new Date().toISOString();

const logger = this.logger;

return {
eventType,
isBusy,
isOnline,
isFinished,
pollValues: async function* () {
while (keepReading) {
try {
isBusy = true;
const events = await eventsFetcher(lastTimestamp);
currentPoll = eventsFetcher(lastTimestamp);
const events = await currentPoll;
logger.debug("Polled events from Yagna", {
eventType,
count: events.length,
Expand All @@ -67,17 +64,22 @@ export class EventReader {
lastTimestamp = event.eventDate;
}
} catch (error) {
if (typeof error === "object" && error.name === "CancelError") {
logger.debug("Polling was cancelled", { eventType });
continue;
}
logger.error("Error fetching events from Yagna", { eventType, error });
} finally {
isBusy = false;
}
}
logger.debug("Stopped reading events", { eventType });
isOnline = false;
isFinished = true;
},
cancel: async function () {
keepReading = false;
await waitForCondition(() => !isBusy && !isOnline);
if (currentPoll) {
currentPoll.cancel();
}
await waitForCondition(() => isFinished, { intervalSeconds: 0 });
logger.debug("Cancelled reading the events", { eventType });
},
};
Expand Down
2 changes: 1 addition & 1 deletion src/shared/yagna/yagna.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ describe("Yagna Utils", () => {

await expect(() => y.connect()).rejects.toMatchError(
new GolemPlatformError(
`Unreadable yana version 'broken'. Can't proceed without checking yagna version support status.`,
`Unreadable yagna version 'broken'. Can't proceed without checking yagna version support status.`,
),
);
});
Expand Down
12 changes: 7 additions & 5 deletions src/shared/yagna/yagnaApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -246,19 +246,21 @@ export class YagnaApi {
private async stopPollingEvents() {
this.logger.debug("Stopping polling events from Yagna");

const promises: Promise<void>[] = [];
if (this.invoiceEventPoll) {
await this.invoiceEventPoll.cancel();
promises.push(this.invoiceEventPoll.cancel());
}

if (this.debitNoteEventsPoll) {
await this.debitNoteEventsPoll.cancel();
promises.push(this.debitNoteEventsPoll.cancel());
}

if (this.agreementEventsPoll) {
await this.agreementEventsPoll.cancel();
promises.push(this.agreementEventsPoll.cancel());
}
await Promise.allSettled(promises);

this.logger.debug("Stopped polling events form Yagna");
this.logger.debug("Stopped polling events from Yagna");
}

private async assertSupportedVersion() {
Expand All @@ -272,7 +274,7 @@ export class YagnaApi {

if (!normVersion) {
throw new GolemPlatformError(
`Unreadable yana version '${version}'. Can't proceed without checking yagna version support status.`,
`Unreadable yagna version '${version}'. Can't proceed without checking yagna version support status.`,
);
}

Expand Down

0 comments on commit eb9a4df

Please sign in to comment.