Skip to content

Commit

Permalink
new ObservableTaker testing utils
Browse files Browse the repository at this point in the history
  • Loading branch information
phryneas committed Sep 26, 2023
1 parent 591d6c1 commit dfd9b74
Show file tree
Hide file tree
Showing 4 changed files with 282 additions and 1 deletion.
77 changes: 77 additions & 0 deletions src/testing/internal/ObservableTaker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import type { Observable } from "../../utilities/index.js";

interface PeekOptions {
timeout?: number;
}
type ObservableEvent<T> =
| { type: "next"; value: T }
| { type: "error"; error: any }
| { type: "complete" };

async function* observableToAsyncEventIterator<T>(observable: Observable<T>) {
let resolveNext: undefined | ((value: ObservableEvent<T>) => void);
const promises: Promise<ObservableEvent<T>>[] = [];
pushPromise();

function pushPromise() {
promises.push(
new Promise<ObservableEvent<T>>((resolve) => {
resolveNext = resolve;
})
);
}

function onValue(value: ObservableEvent<T>) {
resolveNext!(value);
pushPromise();
}
observable.subscribe(
(value) => onValue({ type: "next", value }),
(error) => onValue({ type: "error", error }),
() => onValue({ type: "complete" })
);

while (true) {
yield promises.shift()!;
}
}

class IteratorTaker<T> {
constructor(private iterator: AsyncGenerator<T, void, unknown>) {}

async take({ timeout = 100 }: PeekOptions = {}): Promise<T> {
return Promise.race([
this.iterator.next().then((result) => result.value!),
new Promise<T>((_, reject) => {
setTimeout(
reject,
timeout,
new Error("Timeout waiting for next event")
);
}),
]);
}
}

export class ObservableTaker<T> extends IteratorTaker<ObservableEvent<T>> {
constructor(observable: Observable<T>) {
super(observableToAsyncEventIterator(observable));
}

async takeNext(options?: PeekOptions): Promise<T> {
const event = await this.take(options);
expect(event).toEqual({ type: "next", value: expect.anything() });
return (event as ObservableEvent<T> & { type: "next" }).value;
}

async takeError(options?: PeekOptions): Promise<any> {
const event = await this.take(options);
expect(event).toEqual({ type: "error", error: expect.anything() });
return (event as ObservableEvent<T> & { type: "error" }).error;
}

async takeComplete(options?: PeekOptions): Promise<void> {
const event = await this.take(options);
expect(event).toEqual({ type: "complete" });
}
}
85 changes: 85 additions & 0 deletions src/testing/internal/__tests__/ObservableTaker.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import { Observable } from "../../../utilities";
import { ObservableTaker } from "../ObservableTaker";

it("allows to step through an observable until completion", async () => {
const taker = new ObservableTaker(
new Observable<number>((observer) => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
})
);
await expect(taker.takeNext()).resolves.toBe(1);
await expect(taker.takeNext()).resolves.toBe(2);
await expect(taker.takeNext()).resolves.toBe(3);
await expect(taker.takeComplete()).resolves.toBeUndefined();
});

it("allows to step through an observable until error", async () => {
const taker = new ObservableTaker(
new Observable<number>((observer) => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.error(new Error("expected"));
})
);
await expect(taker.takeNext()).resolves.toBe(1);
await expect(taker.takeNext()).resolves.toBe(2);
await expect(taker.takeNext()).resolves.toBe(3);
await expect(taker.takeError()).resolves.toEqual(expect.any(Error));
});

it("will time out if no more value is omitted", async () => {
const taker = new ObservableTaker(
new Observable<number>((observer) => {
observer.next(1);
observer.next(2);
})
);
await expect(taker.takeNext()).resolves.toBe(1);
await expect(taker.takeNext()).resolves.toBe(2);
await expect(taker.takeNext()).rejects.toEqual(expect.any(Error));
});

it.each([
["takeNext", "complete"],
["takeNext", "error"],
["takeError", "complete"],
["takeError", "next"],
["takeComplete", "next"],
["takeComplete", "error"],
])("errors when %s receives %s instead", async (expected, gotten) => {
const taker = new ObservableTaker(
new Observable<number>((observer) => {
observer.next(1);
observer.next(2);
// @ts-ignore
observer[gotten](3);
})
);
await expect(taker.takeNext()).resolves.toBe(1);
await expect(taker.takeNext()).resolves.toBe(2);
// @ts-ignore
await expect(taker[expected]()).rejects.toEqual(expect.any(Error));
});

it.each([
["takeNext", "next"],
["takeError", "error"],
["takeComplete", "complete"],
])("succeeds when %s, receives %s", async (expected, gotten) => {
const taker = new ObservableTaker(
new Observable<number>((observer) => {
observer.next(1);
observer.next(2);
// @ts-ignore
observer[gotten](3);
})
);
await expect(taker.takeNext()).resolves.toBe(1);
await expect(taker.takeNext()).resolves.toBe(2);
// @ts-ignore this should just not throw
await taker[expected]();
});
1 change: 1 addition & 0 deletions src/testing/internal/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from "./profile/index.js";
export * from "./disposables/index.js";
export { ObservableTaker } from "./ObservableTaker.js";
120 changes: 119 additions & 1 deletion src/utilities/observables/__tests__/asyncMap.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Observable } from "../Observable";
import { asyncMap } from "../asyncMap";
import { itAsync } from "../../../testing";

import { ObservableTaker } from "../../../testing/internal";
const wait = (delayMs: number) =>
new Promise<void>((resolve) => setTimeout(resolve, delayMs));

Expand Down Expand Up @@ -141,4 +141,122 @@ describe("asyncMap", () => {
}),
});
});

test.each([
["sync", (n: number) => n * 2],
["async", async (n: number) => n * 2],
])("[%s] mapFn maps over values", async (_, mapFn) => {
const observable = new Observable<number>((observer) => {
observer.next(1);
observer.next(2);
setTimeout(() => {
observer.next(3);
setTimeout(() => {
observer.next(4);
observer.complete();
}, 10);
}, 10);
});
const mapped = asyncMap(observable, mapFn);
const taker = new ObservableTaker(mapped);
await expect(taker.takeNext()).resolves.toBe(2);
await expect(taker.takeNext()).resolves.toBe(4);
await expect(taker.takeNext()).resolves.toBe(6);
await expect(taker.takeNext()).resolves.toBe(8);
await taker.takeComplete();
});

test.each([["sync"], ["async"]])(
"[%s] mapFn can convert next to error",
async (synchronity) => {
const observable = new Observable<number>((observer) => {
observer.next(1);
observer.next(2);
setTimeout(() => {
// this will throw
observer.next(3);
// this will be swallowed and also not call `mapFn` anymore
observer.next(4);
setTimeout(() => {
observer.next(5);
observer.complete();
}, 10);
}, 10);
});
let lastMapped = 0;
const mapped = asyncMap(
observable,
synchronity === "sync"
? (n: number) => {
lastMapped = n;
if (n === 3) throw new Error("expected");
return n * 2;
}
: async (n: number) => {
lastMapped = n;
if (n === 3) throw new Error("expected");
return n * 2;
}
);
const taker = new ObservableTaker(mapped);
await expect(taker.takeNext()).resolves.toBe(2);
await expect(taker.takeNext()).resolves.toBe(4);
await expect(taker.takeError()).resolves.toEqual(new Error("expected"));
// no more emits
expect(taker.take()).rejects.toMatch(/timeout/i);
// the observer currently keeps running for values `next`ed synchonously
// even if the Observable is closed every execution will be wasted
expect(lastMapped).toBe(4);
}
);

test.each([
["sync", () => 99],
["async", async () => 99],
])("[%s] catchFn can convert error to next", async (_, catchFn) => {
const observable = new Observable<number>((observer) => {
observer.next(1);
observer.next(2);
setTimeout(() => {
observer.error(new Error("expected"));
// will be ignored by parent Observable since the observer already closed
observer.next(4);
}, 10);
});
const mapped = asyncMap(observable, (n) => n * 2, catchFn);
const taker = new ObservableTaker(mapped);
await expect(taker.takeNext()).resolves.toBe(2);
await expect(taker.takeNext()).resolves.toBe(4);
await expect(taker.takeNext()).resolves.toBe(99);
// even after recovery, further `.next` inside the observer will be ignored
// by the parent Observable itself, so asyncMap cannot do anything about that
expect(taker.take()).rejects.toMatch(/timeout/i);
});

test.each([
// prettier-ignore
["sync", () => { throw new Error("another error") }],
// prettier-ignore
["async", async () => { throw new Error("another error") }],
])("[%s] catchFn can map one error to another error", async (_, catchFn) => {
const observable = new Observable<number>((observer) => {
observer.next(1);
observer.next(2);
setTimeout(() => {
observer.error(new Error("expected"));
// will be ignored by Observable since the observer already closed
observer.next(4);
}, 10);
});
const mapped = asyncMap(observable, (n) => n * 2, catchFn);
const taker = new ObservableTaker(mapped);
await expect(taker.takeNext()).resolves.toBe(2);
await expect(taker.takeNext()).resolves.toBe(4);
await expect(taker.takeError()).resolves.toEqual(
new Error("another error")
);
// even after recovery, further `.next` inside the observer will be ignored
// by the Observable itself, so asyncMap cannot do anything about that
expect(taker.take()).rejects.toMatch(/timeout/i);
});
});

0 comments on commit dfd9b74

Please sign in to comment.