Skip to content

Commit

Permalink
Merge branch 'beta' into feature/JST-949/lease-module
Browse files Browse the repository at this point in the history
  • Loading branch information
SewerynKras committed May 29, 2024
2 parents de05c7f + 4b6910c commit e3b384b
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 99 deletions.
20 changes: 7 additions & 13 deletions src/lease-process/lease-process-pool.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { _, imock, instance, mock, reset, verify, when } from "@johanblumenberg/ts-mockito";
import type { Agreement } from "../market/agreement/agreement";
import { _, imock, instance, mock, reset, spy, verify, when } from "@johanblumenberg/ts-mockito";
import { LeaseProcess } from "./lease-process";
import { Allocation } from "../payment";
import type { MarketModule } from "../market";
Expand Down Expand Up @@ -98,22 +98,16 @@ describe("LeaseProcessPool", () => {
});
it("stops retrying after specified timeout is reached", async () => {
const pool = getLeasePool({ min: 3 });
pool["createNewLeaseProcess"] = jest
.fn(
() =>
new Promise<LeaseProcess | never>((_, reject) =>
setTimeout(() => reject(new Error("Failed to propose agreement")), 50),
),
)
// the first call will succeed, the rest will fail (fall back to the first implementation)
.mockImplementationOnce(() => new Promise((resolve) => setTimeout(() => resolve(getMockLeaseProcess()), 50)));
const poolSpy = spy(pool);
when(poolSpy["createNewLeaseProcess"]())
.thenResolve(getMockLeaseProcess())
.thenReject(new Error("Failed to propose agreement"));

await expect(pool.ready(60)).rejects.toThrow(
await expect(pool.ready(10)).rejects.toThrow(
"Could not create enough lease processes to reach the minimum pool size in time",
);
expect(pool.getAvailableSize()).toBe(1);
// first loop 3 times, then 2 times
expect(pool["createNewLeaseProcess"]).toHaveBeenCalledTimes(5);
verify(poolSpy["createNewLeaseProcess"]()).atLeast(3);
});
});
describe("acquire()", () => {
Expand Down
21 changes: 9 additions & 12 deletions src/lease-process/lease-process-pool.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import type { Agreement } from "../market/agreement/agreement";
import type { Logger } from "../shared/utils";
import { sleep } from "../shared/utils";
import { createAbortSignalFromTimeout, runOnNextEventLoopIteration } from "../shared/utils";
import type { DraftOfferProposalPool, MarketModule } from "../market";
import { GolemMarketError, MarketErrorCode } from "../market";
import { EventEmitter } from "eventemitter3";
import type { RequireAtLeastOne } from "../shared/utils/types";
import type { Allocation } from "../payment";
import type { LeaseProcess, LeaseProcessOptions } from "./lease-process";
import { Network, NetworkModule } from "../network";
import { createAbortSignalFromTimeout } from "../shared/utils/abortSignal";
import { LeaseModule } from "./lease.module";

export interface LeaseProcessPoolDependencies {
Expand Down Expand Up @@ -299,10 +298,7 @@ export class LeaseProcessPool {
}
const signal = createAbortSignalFromTimeout(timeoutOrAbortSignal);

while (this.minPoolSize > this.getAvailableSize()) {
if (signal.aborted) {
break;
}
const tryCreatingMissingLeaseProcesses = async () => {
await Promise.allSettled(
new Array(this.minPoolSize - this.getAvailableSize()).fill(0).map(() =>
this.createNewLeaseProcess().then(
Expand All @@ -311,12 +307,13 @@ export class LeaseProcessPool {
),
),
);
// Wait for at least 1 tick before trying again
// otherwise there is a risk of blocking the event loop and:
// a) the pool will never get the chance to gain more offers
// b) the abort signal will never change, because timers are processed once a tick
// leading to an infinite loop
await sleep(0);
};

while (this.minPoolSize > this.getAvailableSize()) {
if (signal.aborted) {
break;
}
await runOnNextEventLoopIteration(tryCreatingMissingLeaseProcesses);
}

if (this.minPoolSize > this.getAvailableSize()) {
Expand Down
113 changes: 45 additions & 68 deletions src/market/market.module.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { _, imock, instance, mock, reset, verify, when } from "@johanblumenberg/ts-mockito";
import { _, imock, instance, mock, reset, spy, verify, when } from "@johanblumenberg/ts-mockito";
import { Logger, YagnaApi } from "../shared/utils";
import { MarketModuleImpl } from "./market.module";
import * as YaTsClient from "ya-ts-client";
Expand Down Expand Up @@ -473,90 +473,67 @@ describe("Market module", () => {
});
});
describe("signAgreementFromPool()", () => {
beforeEach(() => {
jest.useRealTimers();
});
it("should keep acquiring proposals until one is successfully signed", async () => {
const badProposal0 = {} as OfferProposal;
const badProposal1 = {} as OfferProposal;
const goodProposal = {} as OfferProposal;
const mockAcquire: DraftOfferProposalPool["acquire"] = jest
.fn()
.mockResolvedValueOnce(badProposal0)
.mockResolvedValueOnce(badProposal1)
.mockResolvedValueOnce(goodProposal);
const mockRemove: DraftOfferProposalPool["remove"] = jest.fn();
const mockPool = {
acquire: mockAcquire,
remove: mockRemove,
} as DraftOfferProposalPool;
const mockPool = mock(DraftOfferProposalPool);
when(mockPool.acquire()).thenResolve(badProposal0).thenResolve(badProposal1).thenResolve(goodProposal);
when(mockPool.remove(_)).thenResolve();
const goodAgreement = {} as Agreement;
jest
.spyOn(marketModule, "proposeAgreement")
.mockRejectedValueOnce(new Error("Failed to sign proposal"))
.mockRejectedValueOnce(new Error("Failed to sign proposal"))
.mockResolvedValueOnce(goodAgreement);

const signedProposal = await marketModule.signAgreementFromPool(mockPool);
expect(mockAcquire).toHaveBeenCalledTimes(3);
expect(marketModule.proposeAgreement).toHaveBeenCalledTimes(3);
expect(mockRemove).toHaveBeenCalledTimes(3);
expect(mockRemove).toHaveBeenCalledWith(badProposal0);
expect(mockRemove).toHaveBeenCalledWith(badProposal1);
// goodProposal should also be removed after signing
expect(mockRemove).toHaveBeenCalledWith(goodProposal);
const marketSpy = spy(marketModule);
when(marketSpy.proposeAgreement(goodProposal)).thenResolve(goodAgreement);
when(marketSpy.proposeAgreement(badProposal0)).thenReject(new Error("Failed to sign proposal"));
when(marketSpy.proposeAgreement(badProposal1)).thenReject(new Error("Failed to sign proposal"));

const signedProposal = await marketModule.signAgreementFromPool(instance(mockPool));

verify(mockPool.acquire()).thrice();
verify(marketSpy.proposeAgreement(badProposal0)).once();
verify(mockPool.remove(badProposal0)).once();
verify(marketSpy.proposeAgreement(badProposal1)).once();
verify(mockPool.remove(badProposal1)).once();
verify(marketSpy.proposeAgreement(goodProposal)).once();
verify(mockPool.remove(goodProposal)).once();
expect(signedProposal).toBe(goodAgreement);
});
it("should release the proposal if the operation is cancelled between acquiring and signing", async () => {
jest.useRealTimers();
const ac = new AbortController();
const error = new Error("Operation cancelled");
const proposal = {} as OfferProposal;
// mock acquire to return a proposal after 100ms
const mockAcquire: DraftOfferProposalPool["acquire"] = jest.fn().mockImplementation(() => {
return new Promise((resolve) => {
setTimeout(() => {
resolve(proposal);
}, 100);
});
const mockPool = mock(DraftOfferProposalPool);
when(mockPool.acquire()).thenCall(async () => {
ac.abort(error);
return proposal;
});
const mockRemove: DraftOfferProposalPool["remove"] = jest.fn();
const mockRelease: DraftOfferProposalPool["release"] = jest.fn();
const mockPool = {
acquire: mockAcquire,
remove: mockRemove,
release: mockRelease,
} as DraftOfferProposalPool;
jest.spyOn(marketModule, "proposeAgreement");
const marketSpy = spy(marketModule);

// cancel the operation after 50ms
const signal = AbortSignal.timeout(50);
await expect(marketModule.signAgreementFromPool(mockPool, signal)).rejects.toThrow(
"The operation was aborted due to timeout",
);
await expect(marketModule.signAgreementFromPool(instance(mockPool), ac.signal)).rejects.toThrow(error);

expect(mockAcquire).toHaveBeenCalledTimes(1);
expect(mockRemove).toHaveBeenCalledTimes(0);
expect(mockRelease).toHaveBeenCalledTimes(1);
expect(mockRelease).toHaveBeenCalledWith(proposal);
expect(marketModule.proposeAgreement).toHaveBeenCalledTimes(0);
verify(mockPool.acquire()).once();
verify(mockPool.release(proposal)).once();
verify(mockPool.remove(_)).never();
verify(marketSpy.proposeAgreement(_)).never();
});
it("should abort immediately if the given signal is already aborted", async () => {
const mockAcquire: DraftOfferProposalPool["acquire"] = jest.fn();
const mockPool = {
acquire: mockAcquire,
} as DraftOfferProposalPool;
const mockPool = mock(DraftOfferProposalPool);
const signal = AbortSignal.abort();
await expect(marketModule.signAgreementFromPool(mockPool, signal)).rejects.toThrow("This operation was aborted");
expect(mockAcquire).toHaveBeenCalledTimes(0);
await expect(marketModule.signAgreementFromPool(instance(mockPool), signal)).rejects.toThrow(
"This operation was aborted",
);
verify(mockPool.acquire()).never();
});
it("should abort after a set timeout", async () => {
const mockAcquire: DraftOfferProposalPool["acquire"] = jest
.fn()
.mockImplementation(() => new Promise((resolve) => setTimeout(resolve, 10)));
const mockPool = {
acquire: mockAcquire,
} as DraftOfferProposalPool;
jest
.spyOn(marketModule, "proposeAgreement")
.mockImplementation(() => new Promise((resolve) => setTimeout(resolve, 10)));
const timeout = 50;
expect(marketModule.signAgreementFromPool(mockPool, timeout)).rejects.toThrow(
const mockPool = mock(DraftOfferProposalPool);
when(mockPool.acquire()).thenResolve({} as OfferProposal);
when(mockPool.remove(_)).thenResolve();
const marketSpy = spy(marketModule);
when(marketSpy.proposeAgreement(_)).thenReject(new Error("Failed to sign proposal"));

await expect(marketModule.signAgreementFromPool(instance(mockPool), 50)).rejects.toThrow(
"The operation was aborted due to timeout",
);
});
Expand Down
19 changes: 13 additions & 6 deletions src/market/market.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
MarketErrorCode,
NewProposalEvent,
} from "./index";
import { defaultLogger, Logger, YagnaApi } from "../shared/utils";
import { defaultLogger, Logger, runOnNextEventLoopIteration, YagnaApi } from "../shared/utils";
import { Allocation, IPaymentApi } from "../payment";
import { bufferTime, catchError, filter, map, mergeMap, Observable, of, OperatorFunction, switchMap, tap } from "rxjs";
import { IProposalRepository, OfferProposal, ProposalFilterNew } from "./offer-proposal";
Expand Down Expand Up @@ -408,12 +408,19 @@ export class MarketModuleImpl implements MarketModule {
try {
const agreement = await this.proposeAgreement(proposal);
// agreement is valid, proposal can be destroyed
await draftProposalPool.remove(proposal);
await draftProposalPool.remove(proposal).catch((error) => {
this.logger.warn("Signed the agreement but failed to remove the proposal from the pool", { error });
});
return agreement;
} catch {
// If the proposal is not valid, remove it from the pool and try again
await draftProposalPool.remove(proposal);
return tryProposing();
} catch (error) {
this.logger.debug("Failed to propose agreement, retrying", { error });
// We failed to propose the agreement, destroy the proposal and try again with another one
await draftProposalPool.remove(proposal).catch((error) => {
this.logger.warn("Failed to remove the proposal from the pool after unsuccessful agreement proposal", {
error,
});
});
return runOnNextEventLoopIteration(tryProposing);
}
};
return tryProposing();
Expand Down
30 changes: 30 additions & 0 deletions src/shared/utils/eventLoop.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* Run a callback on the next event loop iteration ("promote" a microtask to a task using setTimeout).
* Note that this is not guaranteed to run on the very next iteration, but it will run as soon as possible.
* This function is designed to avoid the problem of microtasks queueing other microtasks in an infinite loop.
* See the example below for a common pitfall that this function can help avoid.
* Learn more about microtasks and their relation to async/await here:
* https://developer.mozilla.org/en-US/docs/Web/API/HTML_DOM_API/Microtask_guide/In_depth
* https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Operators/await#control_flow_effects_of_await
* @param cb The callback to run on the next event loop iteration.
* @example
* ```ts
* const signal = AbortSignal.timeout(1_000);
* // This loop will run for 1 second, then stop.
* while (!signal.aborted) {
* await runOnNextEventLoopIteration(() => Promise.resolve());
* }
*
* const signal = AbortSignal.timeout(1_000);
* // This loop will run indefinitely.
* // Each while loop iteration queues a microtask, which itself queues another microtask, and so on.
* while (!signal.aborted) {
* await Promise.resolve();
* }
* ```
*/
export function runOnNextEventLoopIteration<T>(cb: () => Promise<T>): Promise<T> {
return new Promise((resolve, reject) => {
setTimeout(() => cb().then(resolve).catch(reject));
});
}
2 changes: 2 additions & 0 deletions src/shared/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ export { nullLogger } from "./logger/nullLogger";
export { defaultLogger } from "./logger/defaultLogger";
export * as EnvUtils from "./env";
export { YagnaApi, YagnaOptions } from "../yagna/yagnaApi";
export * from "./abortSignal";
export * from "./eventLoop";

0 comments on commit e3b384b

Please sign in to comment.