Skip to content

Commit

Permalink
Merge pull request #1054 from golemfactory/bugfix/JST-1047/memory-leak
Browse files Browse the repository at this point in the history
fix(exe-unit): fixed exceeding max listeners when running multiple commands on the same exe unit
  • Loading branch information
grisha87 authored Sep 3, 2024
2 parents f79d8df + 31f01aa commit 28d634e
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 15 deletions.
6 changes: 3 additions & 3 deletions src/activity/exe-script-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { Result, StreamingBatchEvent } from "./results";
import { Activity } from "./activity";
import { getMessageFromApiError } from "../shared/utils/apiErrorMessage";
import { ActivityModule } from "./activity.module";
import { catchError, map, Observable, takeWhile } from "rxjs";
import { catchError, finalize, map, Observable, takeWhile } from "rxjs";

/**
* Information needed to fetch the results of a script execution
Expand Down Expand Up @@ -106,7 +106,7 @@ export class ExeScriptExecutor {
signalOrTimeout?: number | AbortSignal,
maxRetries?: number,
): Observable<Result> {
const signal = anyAbortSignal(this.abortSignal, createAbortSignalFromTimeout(signalOrTimeout));
const { signal, cleanup } = anyAbortSignal(this.abortSignal, createAbortSignalFromTimeout(signalOrTimeout));

// observable that emits when the script execution should be aborted
const abort$ = new Observable<never>((subscriber) => {
Expand All @@ -126,7 +126,7 @@ export class ExeScriptExecutor {
? this.streamingBatch(batch.batchId, batch.batchSize)
: this.pollingBatch(batch.batchId, maxRetries);

return mergeUntilFirstComplete(abort$, results$);
return mergeUntilFirstComplete(abort$, results$).pipe(finalize(cleanup));
}

protected async send(script: ExeScriptRequest): Promise<string> {
Expand Down
7 changes: 6 additions & 1 deletion src/golem-network/golem-network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -428,14 +428,19 @@ export class GolemNetwork {
* @param options.teardown - an optional function that is called before the exe unit is destroyed
*/
async oneOf({ order, setup, teardown, signalOrTimeout }: OneOfOptions): Promise<ResourceRental> {
const signal = anyAbortSignal(createAbortSignalFromTimeout(signalOrTimeout), this.abortController.signal);
const { signal, cleanup: cleanupAbortSignals } = anyAbortSignal(
createAbortSignalFromTimeout(signalOrTimeout),
this.abortController.signal,
);

let allocation: Allocation | undefined = undefined;
let proposalSubscription: Subscription | undefined = undefined;
let rental: ResourceRental | undefined = undefined;
let networkNode: NetworkNode | undefined = undefined;

const cleanup = async () => {
cleanupAbortSignals();

if (proposalSubscription) {
proposalSubscription.unsubscribe();
}
Expand Down
16 changes: 13 additions & 3 deletions src/resource-rental/resource-rental-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,10 @@ export class ResourceRentalPool {

private async createNewResourceRental(signalOrTimeout?: number | AbortSignal) {
this.logger.debug("Creating new resource rental to add to pool");
const signal = anyAbortSignal(this.abortController.signal, createAbortSignalFromTimeout(signalOrTimeout));
const { signal, cleanup } = anyAbortSignal(
this.abortController.signal,
createAbortSignalFromTimeout(signalOrTimeout),
);

try {
this.rentalsBeingSigned++;
Expand Down Expand Up @@ -158,6 +161,7 @@ export class ResourceRentalPool {
throw error;
} finally {
this.rentalsBeingSigned--;
cleanup();
}
}

Expand Down Expand Up @@ -216,7 +220,7 @@ export class ResourceRentalPool {
*/
private async raceNewRentalWithAcquireQueue(signalOrTimeout?: number | AbortSignal) {
const ac = new AbortController();
const signal = anyAbortSignal(
const { signal, cleanup } = anyAbortSignal(
ac.signal,
createAbortSignalFromTimeout(signalOrTimeout),
this.abortController.signal,
Expand All @@ -235,6 +239,7 @@ export class ResourceRentalPool {
})
.finally(() => {
ac.abort();
cleanup();
});
}

Expand Down Expand Up @@ -405,7 +410,10 @@ export class ResourceRentalPool {
if (this.minPoolSize <= this.getAvailableSize()) {
return;
}
const signal = anyAbortSignal(this.abortController.signal, createAbortSignalFromTimeout(timeoutOrAbortSignal));
const { signal, cleanup } = anyAbortSignal(
this.abortController.signal,
createAbortSignalFromTimeout(timeoutOrAbortSignal),
);
const tryCreatingMissingResourceRentals = async () => {
await Promise.allSettled(
new Array(this.minPoolSize - this.getAvailableSize()).fill(0).map(() =>
Expand All @@ -424,6 +432,8 @@ export class ResourceRentalPool {
await runOnNextEventLoopIteration(tryCreatingMissingResourceRentals);
}

cleanup();

if (this.minPoolSize > this.getAvailableSize()) {
throw new Error("Could not create enough resource rentals to reach the minimum pool size in time");
}
Expand Down
31 changes: 25 additions & 6 deletions src/shared/utils/abortSignal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,44 @@ export function createAbortSignalFromTimeout(timeoutOrSignal: number | AbortSign
return new AbortController().signal;
}

interface AbortEvent extends Event {
target: EventTarget & { reason?: string | Error };
}

/**
* Combine multiple AbortSignals into a single signal that will be aborted if any
* of the input signals are aborted. If any of the input signals are already aborted,
* the returned signal will be aborted immediately.
*
* Polyfill for AbortSignal.any(), since it's only available starting in Node 20
* https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal/any_static
*
* The function returns a signal and a cleanup function that allows you
* to remove listeners when they are no longer needed.
*/
export function anyAbortSignal(...signals: AbortSignal[]) {

export function anyAbortSignal(...signals: AbortSignal[]): { signal: AbortSignal; cleanup: () => void } {
const controller = new AbortController();

const onAbort = (ev: Event) => {
if (controller.signal.aborted) return;
const reason = (ev as AbortEvent).target.reason;
controller.abort(reason);
};

for (const signal of signals) {
if (signal.aborted) {
controller.abort(signal.reason);
break;
}
signal.addEventListener("abort", () => {
if (controller.signal.aborted) return;
controller.abort(signal.reason);
});
signal.addEventListener("abort", onAbort);
}
return controller.signal;

const cleanup = () => {
for (const signal of signals) {
signal.removeEventListener("abort", onAbort);
}
};

return { signal: controller.signal, cleanup };
}
7 changes: 5 additions & 2 deletions src/shared/utils/acquireQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ export class AcquireQueue<T> {
* Queue up for the next available item.
*/
public async get(signalOrTimeout?: number | AbortSignal): Promise<T> {
const signal = anyAbortSignal(createAbortSignalFromTimeout(signalOrTimeout), this.abortController.signal);
const { signal, cleanup } = anyAbortSignal(
createAbortSignalFromTimeout(signalOrTimeout),
this.abortController.signal,
);
signal.throwIfAborted();
const { resolve, promise } = withResolvers<T>();
this.queue.push(resolve);
Expand All @@ -50,7 +53,7 @@ export class AcquireQueue<T> {
reject(signal.reason);
});
});
return Promise.race([promise, abortPromise]);
return Promise.race([promise, abortPromise]).finally(cleanup);
}

/**
Expand Down

0 comments on commit 28d634e

Please sign in to comment.