Skip to content

Commit

Permalink
refactor: added cleanup function for anyAbortSignal which removes lis…
Browse files Browse the repository at this point in the history
…teners
  • Loading branch information
mgordel committed Sep 3, 2024
1 parent be83305 commit 31f01aa
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 32 deletions.
26 changes: 6 additions & 20 deletions src/activity/exe-script-executor.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
anyAbortSignal,
createAbortSignalFromTimeout,
Logger,
mergeUntilFirstComplete,
Expand Down Expand Up @@ -105,27 +106,17 @@ export class ExeScriptExecutor {
signalOrTimeout?: number | AbortSignal,
maxRetries?: number,
): Observable<Result> {
const abortController = new AbortController();
const signal = createAbortSignalFromTimeout(signalOrTimeout);

const onAbort = () => abortController.abort(this.abortSignal.reason || signal.reason);

if (signal.aborted || this.abortSignal.aborted) {
onAbort();
} else {
this.abortSignal.addEventListener("abort", onAbort);
signal.addEventListener("abort", onAbort);
}
const { signal, cleanup } = anyAbortSignal(this.abortSignal, createAbortSignalFromTimeout(signalOrTimeout));

// observable that emits when the script execution should be aborted
const abort$ = new Observable<never>((subscriber) => {
const getError = () => new GolemAbortError("Execution of script has been aborted", abortController.signal.reason);
const getError = () => new GolemAbortError("Execution of script has been aborted", signal.reason);

if (abortController.signal.aborted) {
if (signal.aborted) {
subscriber.error(getError());
}

abortController.signal.addEventListener("abort", () => {
signal.addEventListener("abort", () => {
subscriber.error(getError());
});
});
Expand All @@ -135,12 +126,7 @@ export class ExeScriptExecutor {
? this.streamingBatch(batch.batchId, batch.batchSize)
: this.pollingBatch(batch.batchId, maxRetries);

return mergeUntilFirstComplete(abort$, results$).pipe(
finalize(() => {
this.abortSignal.removeEventListener("abort", onAbort);
signal.removeEventListener("abort", onAbort);
}),
);
return mergeUntilFirstComplete(abort$, results$).pipe(finalize(cleanup));
}

protected async send(script: ExeScriptRequest): Promise<string> {
Expand Down
7 changes: 6 additions & 1 deletion src/golem-network/golem-network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -428,14 +428,19 @@ export class GolemNetwork {
* @param options.teardown - an optional function that is called before the exe unit is destroyed
*/
async oneOf({ order, setup, teardown, signalOrTimeout }: OneOfOptions): Promise<ResourceRental> {
const signal = anyAbortSignal(createAbortSignalFromTimeout(signalOrTimeout), this.abortController.signal);
const { signal, cleanup: cleanupAbortSignals } = anyAbortSignal(
createAbortSignalFromTimeout(signalOrTimeout),
this.abortController.signal,
);

let allocation: Allocation | undefined = undefined;
let proposalSubscription: Subscription | undefined = undefined;
let rental: ResourceRental | undefined = undefined;
let networkNode: NetworkNode | undefined = undefined;

const cleanup = async () => {
cleanupAbortSignals();

if (proposalSubscription) {
proposalSubscription.unsubscribe();
}
Expand Down
16 changes: 13 additions & 3 deletions src/resource-rental/resource-rental-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,10 @@ export class ResourceRentalPool {

private async createNewResourceRental(signalOrTimeout?: number | AbortSignal) {
this.logger.debug("Creating new resource rental to add to pool");
const signal = anyAbortSignal(this.abortController.signal, createAbortSignalFromTimeout(signalOrTimeout));
const { signal, cleanup } = anyAbortSignal(
this.abortController.signal,
createAbortSignalFromTimeout(signalOrTimeout),
);

try {
this.rentalsBeingSigned++;
Expand Down Expand Up @@ -158,6 +161,7 @@ export class ResourceRentalPool {
throw error;
} finally {
this.rentalsBeingSigned--;
cleanup();
}
}

Expand Down Expand Up @@ -216,7 +220,7 @@ export class ResourceRentalPool {
*/
private async raceNewRentalWithAcquireQueue(signalOrTimeout?: number | AbortSignal) {
const ac = new AbortController();
const signal = anyAbortSignal(
const { signal, cleanup } = anyAbortSignal(
ac.signal,
createAbortSignalFromTimeout(signalOrTimeout),
this.abortController.signal,
Expand All @@ -235,6 +239,7 @@ export class ResourceRentalPool {
})
.finally(() => {
ac.abort();
cleanup();
});
}

Expand Down Expand Up @@ -405,7 +410,10 @@ export class ResourceRentalPool {
if (this.minPoolSize <= this.getAvailableSize()) {
return;
}
const signal = anyAbortSignal(this.abortController.signal, createAbortSignalFromTimeout(timeoutOrAbortSignal));
const { signal, cleanup } = anyAbortSignal(
this.abortController.signal,
createAbortSignalFromTimeout(timeoutOrAbortSignal),
);
const tryCreatingMissingResourceRentals = async () => {
await Promise.allSettled(
new Array(this.minPoolSize - this.getAvailableSize()).fill(0).map(() =>
Expand All @@ -424,6 +432,8 @@ export class ResourceRentalPool {
await runOnNextEventLoopIteration(tryCreatingMissingResourceRentals);
}

cleanup();

if (this.minPoolSize > this.getAvailableSize()) {
throw new Error("Could not create enough resource rentals to reach the minimum pool size in time");
}
Expand Down
31 changes: 25 additions & 6 deletions src/shared/utils/abortSignal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,44 @@ export function createAbortSignalFromTimeout(timeoutOrSignal: number | AbortSign
return new AbortController().signal;
}

interface AbortEvent extends Event {
target: EventTarget & { reason?: string | Error };
}

/**
* Combine multiple AbortSignals into a single signal that will be aborted if any
* of the input signals are aborted. If any of the input signals are already aborted,
* the returned signal will be aborted immediately.
*
* Polyfill for AbortSignal.any(), since it's only available starting in Node 20
* https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal/any_static
*
* The function returns a signal and a cleanup function that allows you
* to remove listeners when they are no longer needed.
*/
export function anyAbortSignal(...signals: AbortSignal[]) {

export function anyAbortSignal(...signals: AbortSignal[]): { signal: AbortSignal; cleanup: () => void } {
const controller = new AbortController();

const onAbort = (ev: Event) => {
if (controller.signal.aborted) return;
const reason = (ev as AbortEvent).target.reason;
controller.abort(reason);
};

for (const signal of signals) {
if (signal.aborted) {
controller.abort(signal.reason);
break;
}
signal.addEventListener("abort", () => {
if (controller.signal.aborted) return;
controller.abort(signal.reason);
});
signal.addEventListener("abort", onAbort);
}
return controller.signal;

const cleanup = () => {
for (const signal of signals) {
signal.removeEventListener("abort", onAbort);
}
};

return { signal: controller.signal, cleanup };
}
7 changes: 5 additions & 2 deletions src/shared/utils/acquireQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ export class AcquireQueue<T> {
* Queue up for the next available item.
*/
public async get(signalOrTimeout?: number | AbortSignal): Promise<T> {
const signal = anyAbortSignal(createAbortSignalFromTimeout(signalOrTimeout), this.abortController.signal);
const { signal, cleanup } = anyAbortSignal(
createAbortSignalFromTimeout(signalOrTimeout),
this.abortController.signal,
);
signal.throwIfAborted();
const { resolve, promise } = withResolvers<T>();
this.queue.push(resolve);
Expand All @@ -50,7 +53,7 @@ export class AcquireQueue<T> {
reject(signal.reason);
});
});
return Promise.race([promise, abortPromise]);
return Promise.race([promise, abortPromise]).finally(cleanup);
}

/**
Expand Down

0 comments on commit 31f01aa

Please sign in to comment.