diff --git a/src/lease-process/lease-process-pool.test.ts b/src/lease-process/lease-process-pool.test.ts index b4ea18d7e..cc260736b 100644 --- a/src/lease-process/lease-process-pool.test.ts +++ b/src/lease-process/lease-process-pool.test.ts @@ -1,5 +1,5 @@ -import { _, imock, instance, mock, reset, verify, when } from "@johanblumenberg/ts-mockito"; import type { Agreement } from "../market/agreement/agreement"; +import { _, imock, instance, mock, reset, spy, verify, when } from "@johanblumenberg/ts-mockito"; import { LeaseProcess } from "./lease-process"; import { Allocation } from "../payment"; import type { MarketModule } from "../market"; @@ -98,22 +98,16 @@ describe("LeaseProcessPool", () => { }); it("stops retrying after specified timeout is reached", async () => { const pool = getLeasePool({ min: 3 }); - pool["createNewLeaseProcess"] = jest - .fn( - () => - new Promise((_, reject) => - setTimeout(() => reject(new Error("Failed to propose agreement")), 50), - ), - ) - // the first call will succeed, the rest will fail (fall back to the first implementation) - .mockImplementationOnce(() => new Promise((resolve) => setTimeout(() => resolve(getMockLeaseProcess()), 50))); + const poolSpy = spy(pool); + when(poolSpy["createNewLeaseProcess"]()) + .thenResolve(getMockLeaseProcess()) + .thenReject(new Error("Failed to propose agreement")); - await expect(pool.ready(60)).rejects.toThrow( + await expect(pool.ready(10)).rejects.toThrow( "Could not create enough lease processes to reach the minimum pool size in time", ); expect(pool.getAvailableSize()).toBe(1); - // first loop 3 times, then 2 times - expect(pool["createNewLeaseProcess"]).toHaveBeenCalledTimes(5); + verify(poolSpy["createNewLeaseProcess"]()).atLeast(3); }); }); describe("acquire()", () => { diff --git a/src/lease-process/lease-process-pool.ts b/src/lease-process/lease-process-pool.ts index ed08ed776..b5f31d679 100644 --- a/src/lease-process/lease-process-pool.ts +++ b/src/lease-process/lease-process-pool.ts @@ -1,6 +1,6 @@ import type { Agreement } from "../market/agreement/agreement"; import type { Logger } from "../shared/utils"; -import { sleep } from "../shared/utils"; +import { createAbortSignalFromTimeout, runOnNextEventLoopIteration } from "../shared/utils"; import type { DraftOfferProposalPool, MarketModule } from "../market"; import { GolemMarketError, MarketErrorCode } from "../market"; import { EventEmitter } from "eventemitter3"; @@ -8,7 +8,6 @@ import type { RequireAtLeastOne } from "../shared/utils/types"; import type { Allocation } from "../payment"; import type { LeaseProcess, LeaseProcessOptions } from "./lease-process"; import { Network, NetworkModule } from "../network"; -import { createAbortSignalFromTimeout } from "../shared/utils/abortSignal"; import { LeaseModule } from "./lease.module"; export interface LeaseProcessPoolDependencies { @@ -299,10 +298,7 @@ export class LeaseProcessPool { } const signal = createAbortSignalFromTimeout(timeoutOrAbortSignal); - while (this.minPoolSize > this.getAvailableSize()) { - if (signal.aborted) { - break; - } + const tryCreatingMissingLeaseProcesses = async () => { await Promise.allSettled( new Array(this.minPoolSize - this.getAvailableSize()).fill(0).map(() => this.createNewLeaseProcess().then( @@ -311,12 +307,13 @@ export class LeaseProcessPool { ), ), ); - // Wait for at least 1 tick before trying again - // otherwise there is a risk of blocking the event loop and: - // a) the pool will never get the chance to gain more offers - // b) the abort signal will never change, because timers are processed once a tick - // leading to an infinite loop - await sleep(0); + }; + + while (this.minPoolSize > this.getAvailableSize()) { + if (signal.aborted) { + break; + } + await runOnNextEventLoopIteration(tryCreatingMissingLeaseProcesses); } if (this.minPoolSize > this.getAvailableSize()) { diff --git a/src/market/market.module.test.ts b/src/market/market.module.test.ts index 25ca696ec..545b706aa 100644 --- a/src/market/market.module.test.ts +++ b/src/market/market.module.test.ts @@ -1,4 +1,4 @@ -import { _, imock, instance, mock, reset, verify, when } from "@johanblumenberg/ts-mockito"; +import { _, imock, instance, mock, reset, spy, verify, when } from "@johanblumenberg/ts-mockito"; import { Logger, YagnaApi } from "../shared/utils"; import { MarketModuleImpl } from "./market.module"; import * as YaTsClient from "ya-ts-client"; @@ -473,90 +473,67 @@ describe("Market module", () => { }); }); describe("signAgreementFromPool()", () => { + beforeEach(() => { + jest.useRealTimers(); + }); it("should keep acquiring proposals until one is successfully signed", async () => { const badProposal0 = {} as OfferProposal; const badProposal1 = {} as OfferProposal; const goodProposal = {} as OfferProposal; - const mockAcquire: DraftOfferProposalPool["acquire"] = jest - .fn() - .mockResolvedValueOnce(badProposal0) - .mockResolvedValueOnce(badProposal1) - .mockResolvedValueOnce(goodProposal); - const mockRemove: DraftOfferProposalPool["remove"] = jest.fn(); - const mockPool = { - acquire: mockAcquire, - remove: mockRemove, - } as DraftOfferProposalPool; + const mockPool = mock(DraftOfferProposalPool); + when(mockPool.acquire()).thenResolve(badProposal0).thenResolve(badProposal1).thenResolve(goodProposal); + when(mockPool.remove(_)).thenResolve(); const goodAgreement = {} as Agreement; - jest - .spyOn(marketModule, "proposeAgreement") - .mockRejectedValueOnce(new Error("Failed to sign proposal")) - .mockRejectedValueOnce(new Error("Failed to sign proposal")) - .mockResolvedValueOnce(goodAgreement); - - const signedProposal = await marketModule.signAgreementFromPool(mockPool); - expect(mockAcquire).toHaveBeenCalledTimes(3); - expect(marketModule.proposeAgreement).toHaveBeenCalledTimes(3); - expect(mockRemove).toHaveBeenCalledTimes(3); - expect(mockRemove).toHaveBeenCalledWith(badProposal0); - expect(mockRemove).toHaveBeenCalledWith(badProposal1); - // goodProposal should also be removed after signing - expect(mockRemove).toHaveBeenCalledWith(goodProposal); + const marketSpy = spy(marketModule); + when(marketSpy.proposeAgreement(goodProposal)).thenResolve(goodAgreement); + when(marketSpy.proposeAgreement(badProposal0)).thenReject(new Error("Failed to sign proposal")); + when(marketSpy.proposeAgreement(badProposal1)).thenReject(new Error("Failed to sign proposal")); + + const signedProposal = await marketModule.signAgreementFromPool(instance(mockPool)); + + verify(mockPool.acquire()).thrice(); + verify(marketSpy.proposeAgreement(badProposal0)).once(); + verify(mockPool.remove(badProposal0)).once(); + verify(marketSpy.proposeAgreement(badProposal1)).once(); + verify(mockPool.remove(badProposal1)).once(); + verify(marketSpy.proposeAgreement(goodProposal)).once(); + verify(mockPool.remove(goodProposal)).once(); expect(signedProposal).toBe(goodAgreement); }); it("should release the proposal if the operation is cancelled between acquiring and signing", async () => { - jest.useRealTimers(); + const ac = new AbortController(); + const error = new Error("Operation cancelled"); const proposal = {} as OfferProposal; - // mock acquire to return a proposal after 100ms - const mockAcquire: DraftOfferProposalPool["acquire"] = jest.fn().mockImplementation(() => { - return new Promise((resolve) => { - setTimeout(() => { - resolve(proposal); - }, 100); - }); + const mockPool = mock(DraftOfferProposalPool); + when(mockPool.acquire()).thenCall(async () => { + ac.abort(error); + return proposal; }); - const mockRemove: DraftOfferProposalPool["remove"] = jest.fn(); - const mockRelease: DraftOfferProposalPool["release"] = jest.fn(); - const mockPool = { - acquire: mockAcquire, - remove: mockRemove, - release: mockRelease, - } as DraftOfferProposalPool; - jest.spyOn(marketModule, "proposeAgreement"); + const marketSpy = spy(marketModule); - // cancel the operation after 50ms - const signal = AbortSignal.timeout(50); - await expect(marketModule.signAgreementFromPool(mockPool, signal)).rejects.toThrow( - "The operation was aborted due to timeout", - ); + await expect(marketModule.signAgreementFromPool(instance(mockPool), ac.signal)).rejects.toThrow(error); - expect(mockAcquire).toHaveBeenCalledTimes(1); - expect(mockRemove).toHaveBeenCalledTimes(0); - expect(mockRelease).toHaveBeenCalledTimes(1); - expect(mockRelease).toHaveBeenCalledWith(proposal); - expect(marketModule.proposeAgreement).toHaveBeenCalledTimes(0); + verify(mockPool.acquire()).once(); + verify(mockPool.release(proposal)).once(); + verify(mockPool.remove(_)).never(); + verify(marketSpy.proposeAgreement(_)).never(); }); it("should abort immediately if the given signal is already aborted", async () => { - const mockAcquire: DraftOfferProposalPool["acquire"] = jest.fn(); - const mockPool = { - acquire: mockAcquire, - } as DraftOfferProposalPool; + const mockPool = mock(DraftOfferProposalPool); const signal = AbortSignal.abort(); - await expect(marketModule.signAgreementFromPool(mockPool, signal)).rejects.toThrow("This operation was aborted"); - expect(mockAcquire).toHaveBeenCalledTimes(0); + await expect(marketModule.signAgreementFromPool(instance(mockPool), signal)).rejects.toThrow( + "This operation was aborted", + ); + verify(mockPool.acquire()).never(); }); it("should abort after a set timeout", async () => { - const mockAcquire: DraftOfferProposalPool["acquire"] = jest - .fn() - .mockImplementation(() => new Promise((resolve) => setTimeout(resolve, 10))); - const mockPool = { - acquire: mockAcquire, - } as DraftOfferProposalPool; - jest - .spyOn(marketModule, "proposeAgreement") - .mockImplementation(() => new Promise((resolve) => setTimeout(resolve, 10))); - const timeout = 50; - expect(marketModule.signAgreementFromPool(mockPool, timeout)).rejects.toThrow( + const mockPool = mock(DraftOfferProposalPool); + when(mockPool.acquire()).thenResolve({} as OfferProposal); + when(mockPool.remove(_)).thenResolve(); + const marketSpy = spy(marketModule); + when(marketSpy.proposeAgreement(_)).thenReject(new Error("Failed to sign proposal")); + + await expect(marketModule.signAgreementFromPool(instance(mockPool), 50)).rejects.toThrow( "The operation was aborted due to timeout", ); }); diff --git a/src/market/market.module.ts b/src/market/market.module.ts index ca98c2355..e49ec0ae6 100644 --- a/src/market/market.module.ts +++ b/src/market/market.module.ts @@ -8,7 +8,7 @@ import { MarketErrorCode, NewProposalEvent, } from "./index"; -import { defaultLogger, Logger, YagnaApi } from "../shared/utils"; +import { defaultLogger, Logger, runOnNextEventLoopIteration, YagnaApi } from "../shared/utils"; import { Allocation, IPaymentApi } from "../payment"; import { bufferTime, catchError, filter, map, mergeMap, Observable, of, OperatorFunction, switchMap, tap } from "rxjs"; import { IProposalRepository, OfferProposal, ProposalFilterNew } from "./offer-proposal"; @@ -408,12 +408,19 @@ export class MarketModuleImpl implements MarketModule { try { const agreement = await this.proposeAgreement(proposal); // agreement is valid, proposal can be destroyed - await draftProposalPool.remove(proposal); + await draftProposalPool.remove(proposal).catch((error) => { + this.logger.warn("Signed the agreement but failed to remove the proposal from the pool", { error }); + }); return agreement; - } catch { - // If the proposal is not valid, remove it from the pool and try again - await draftProposalPool.remove(proposal); - return tryProposing(); + } catch (error) { + this.logger.debug("Failed to propose agreement, retrying", { error }); + // We failed to propose the agreement, destroy the proposal and try again with another one + await draftProposalPool.remove(proposal).catch((error) => { + this.logger.warn("Failed to remove the proposal from the pool after unsuccessful agreement proposal", { + error, + }); + }); + return runOnNextEventLoopIteration(tryProposing); } }; return tryProposing(); diff --git a/src/shared/utils/eventLoop.ts b/src/shared/utils/eventLoop.ts new file mode 100644 index 000000000..820cb15c2 --- /dev/null +++ b/src/shared/utils/eventLoop.ts @@ -0,0 +1,30 @@ +/** + * Run a callback on the next event loop iteration ("promote" a microtask to a task using setTimeout). + * Note that this is not guaranteed to run on the very next iteration, but it will run as soon as possible. + * This function is designed to avoid the problem of microtasks queueing other microtasks in an infinite loop. + * See the example below for a common pitfall that this function can help avoid. + * Learn more about microtasks and their relation to async/await here: + * https://developer.mozilla.org/en-US/docs/Web/API/HTML_DOM_API/Microtask_guide/In_depth + * https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Operators/await#control_flow_effects_of_await + * @param cb The callback to run on the next event loop iteration. + * @example + * ```ts + * const signal = AbortSignal.timeout(1_000); + * // This loop will run for 1 second, then stop. + * while (!signal.aborted) { + * await runOnNextEventLoopIteration(() => Promise.resolve()); + * } + * + * const signal = AbortSignal.timeout(1_000); + * // This loop will run indefinitely. + * // Each while loop iteration queues a microtask, which itself queues another microtask, and so on. + * while (!signal.aborted) { + * await Promise.resolve(); + * } + * ``` + */ +export function runOnNextEventLoopIteration(cb: () => Promise): Promise { + return new Promise((resolve, reject) => { + setTimeout(() => cb().then(resolve).catch(reject)); + }); +} diff --git a/src/shared/utils/index.ts b/src/shared/utils/index.ts index ddd76c2f5..40a13a3c4 100644 --- a/src/shared/utils/index.ts +++ b/src/shared/utils/index.ts @@ -7,3 +7,5 @@ export { nullLogger } from "./logger/nullLogger"; export { defaultLogger } from "./logger/defaultLogger"; export * as EnvUtils from "./env"; export { YagnaApi, YagnaOptions } from "../yagna/yagnaApi"; +export * from "./abortSignal"; +export * from "./eventLoop";