diff --git a/src/testing/internal/ObservableTaker.ts b/src/testing/internal/ObservableTaker.ts new file mode 100644 index 00000000000..fb3902297bd --- /dev/null +++ b/src/testing/internal/ObservableTaker.ts @@ -0,0 +1,77 @@ +import type { Observable } from "../../utilities/index.js"; + +interface PeekOptions { + timeout?: number; +} +type ObservableEvent = + | { type: "next"; value: T } + | { type: "error"; error: any } + | { type: "complete" }; + +async function* observableToAsyncEventIterator(observable: Observable) { + let resolveNext: undefined | ((value: ObservableEvent) => void); + const promises: Promise>[] = []; + pushPromise(); + + function pushPromise() { + promises.push( + new Promise>((resolve) => { + resolveNext = resolve; + }) + ); + } + + function onValue(value: ObservableEvent) { + resolveNext!(value); + pushPromise(); + } + observable.subscribe( + (value) => onValue({ type: "next", value }), + (error) => onValue({ type: "error", error }), + () => onValue({ type: "complete" }) + ); + + while (true) { + yield promises.shift()!; + } +} + +class IteratorTaker { + constructor(private iterator: AsyncGenerator) {} + + async take({ timeout = 100 }: PeekOptions = {}): Promise { + return Promise.race([ + this.iterator.next().then((result) => result.value!), + new Promise((_, reject) => { + setTimeout( + reject, + timeout, + new Error("Timeout waiting for next event") + ); + }), + ]); + } +} + +export class ObservableTaker extends IteratorTaker> { + constructor(observable: Observable) { + super(observableToAsyncEventIterator(observable)); + } + + async takeNext(options?: PeekOptions): Promise { + const event = await this.take(options); + expect(event).toEqual({ type: "next", value: expect.anything() }); + return (event as ObservableEvent & { type: "next" }).value; + } + + async takeError(options?: PeekOptions): Promise { + const event = await this.take(options); + expect(event).toEqual({ type: "error", error: expect.anything() }); + return (event as ObservableEvent & { type: "error" }).error; + } + + async takeComplete(options?: PeekOptions): Promise { + const event = await this.take(options); + expect(event).toEqual({ type: "complete" }); + } +} diff --git a/src/testing/internal/__tests__/ObservableTaker.test.ts b/src/testing/internal/__tests__/ObservableTaker.test.ts new file mode 100644 index 00000000000..dbabe8b38cd --- /dev/null +++ b/src/testing/internal/__tests__/ObservableTaker.test.ts @@ -0,0 +1,85 @@ +import { Observable } from "../../../utilities"; +import { ObservableTaker } from "../ObservableTaker"; + +it("allows to step through an observable until completion", async () => { + const taker = new ObservableTaker( + new Observable((observer) => { + observer.next(1); + observer.next(2); + observer.next(3); + observer.complete(); + }) + ); + await expect(taker.takeNext()).resolves.toBe(1); + await expect(taker.takeNext()).resolves.toBe(2); + await expect(taker.takeNext()).resolves.toBe(3); + await expect(taker.takeComplete()).resolves.toBeUndefined(); +}); + +it("allows to step through an observable until error", async () => { + const taker = new ObservableTaker( + new Observable((observer) => { + observer.next(1); + observer.next(2); + observer.next(3); + observer.error(new Error("expected")); + }) + ); + await expect(taker.takeNext()).resolves.toBe(1); + await expect(taker.takeNext()).resolves.toBe(2); + await expect(taker.takeNext()).resolves.toBe(3); + await expect(taker.takeError()).resolves.toEqual(expect.any(Error)); +}); + +it("will time out if no more value is omitted", async () => { + const taker = new ObservableTaker( + new Observable((observer) => { + observer.next(1); + observer.next(2); + }) + ); + await expect(taker.takeNext()).resolves.toBe(1); + await expect(taker.takeNext()).resolves.toBe(2); + await expect(taker.takeNext()).rejects.toEqual(expect.any(Error)); +}); + +it.each([ + ["takeNext", "complete"], + ["takeNext", "error"], + ["takeError", "complete"], + ["takeError", "next"], + ["takeComplete", "next"], + ["takeComplete", "error"], +])("errors when %s receives %s instead", async (expected, gotten) => { + const taker = new ObservableTaker( + new Observable((observer) => { + observer.next(1); + observer.next(2); + // @ts-ignore + observer[gotten](3); + }) + ); + await expect(taker.takeNext()).resolves.toBe(1); + await expect(taker.takeNext()).resolves.toBe(2); + // @ts-ignore + await expect(taker[expected]()).rejects.toEqual(expect.any(Error)); +}); + +it.each([ + ["takeNext", "next"], + ["takeError", "error"], + ["takeComplete", "complete"], +])("succeeds when %s, receives %s", async (expected, gotten) => { + const taker = new ObservableTaker( + new Observable((observer) => { + observer.next(1); + observer.next(2); + // @ts-ignore + observer[gotten](3); + }) + ); + await expect(taker.takeNext()).resolves.toBe(1); + await expect(taker.takeNext()).resolves.toBe(2); + // @ts-ignore this should just not throw + await taker[expected](); +}); diff --git a/src/testing/internal/index.ts b/src/testing/internal/index.ts index 4dd162e2ca9..8bcb77e75e0 100644 --- a/src/testing/internal/index.ts +++ b/src/testing/internal/index.ts @@ -1,2 +1,3 @@ export * from "./profile/index.js"; export * from "./disposables/index.js"; +export { ObservableTaker } from "./ObservableTaker.js"; diff --git a/src/utilities/observables/__tests__/asyncMap.ts b/src/utilities/observables/__tests__/asyncMap.ts index d4a96bb0b23..2e7be1088c2 100644 --- a/src/utilities/observables/__tests__/asyncMap.ts +++ b/src/utilities/observables/__tests__/asyncMap.ts @@ -1,7 +1,7 @@ import { Observable } from "../Observable"; import { asyncMap } from "../asyncMap"; import { itAsync } from "../../../testing"; - +import { ObservableTaker } from "../../../testing/internal"; const wait = (delayMs: number) => new Promise((resolve) => setTimeout(resolve, delayMs)); @@ -141,4 +141,122 @@ describe("asyncMap", () => { }), }); }); + + test.each([ + ["sync", (n: number) => n * 2], + ["async", async (n: number) => n * 2], + ])("[%s] mapFn maps over values", async (_, mapFn) => { + const observable = new Observable((observer) => { + observer.next(1); + observer.next(2); + setTimeout(() => { + observer.next(3); + setTimeout(() => { + observer.next(4); + observer.complete(); + }, 10); + }, 10); + }); + const mapped = asyncMap(observable, mapFn); + const taker = new ObservableTaker(mapped); + await expect(taker.takeNext()).resolves.toBe(2); + await expect(taker.takeNext()).resolves.toBe(4); + await expect(taker.takeNext()).resolves.toBe(6); + await expect(taker.takeNext()).resolves.toBe(8); + await taker.takeComplete(); + }); + + test.each([["sync"], ["async"]])( + "[%s] mapFn can convert next to error", + async (synchronity) => { + const observable = new Observable((observer) => { + observer.next(1); + observer.next(2); + setTimeout(() => { + // this will throw + observer.next(3); + // this will be swallowed and also not call `mapFn` anymore + observer.next(4); + setTimeout(() => { + observer.next(5); + observer.complete(); + }, 10); + }, 10); + }); + let lastMapped = 0; + const mapped = asyncMap( + observable, + synchronity === "sync" + ? (n: number) => { + lastMapped = n; + if (n === 3) throw new Error("expected"); + return n * 2; + } + : async (n: number) => { + lastMapped = n; + if (n === 3) throw new Error("expected"); + return n * 2; + } + ); + const taker = new ObservableTaker(mapped); + await expect(taker.takeNext()).resolves.toBe(2); + await expect(taker.takeNext()).resolves.toBe(4); + await expect(taker.takeError()).resolves.toEqual(new Error("expected")); + // no more emits + expect(taker.take()).rejects.toMatch(/timeout/i); + // the observer currently keeps running for values `next`ed synchonously + // even if the Observable is closed every execution will be wasted + expect(lastMapped).toBe(4); + } + ); + + test.each([ + ["sync", () => 99], + ["async", async () => 99], + ])("[%s] catchFn can convert error to next", async (_, catchFn) => { + const observable = new Observable((observer) => { + observer.next(1); + observer.next(2); + setTimeout(() => { + observer.error(new Error("expected")); + // will be ignored by parent Observable since the observer already closed + observer.next(4); + }, 10); + }); + const mapped = asyncMap(observable, (n) => n * 2, catchFn); + const taker = new ObservableTaker(mapped); + await expect(taker.takeNext()).resolves.toBe(2); + await expect(taker.takeNext()).resolves.toBe(4); + await expect(taker.takeNext()).resolves.toBe(99); + // even after recovery, further `.next` inside the observer will be ignored + // by the parent Observable itself, so asyncMap cannot do anything about that + expect(taker.take()).rejects.toMatch(/timeout/i); + }); + + test.each([ + // prettier-ignore + ["sync", () => { throw new Error("another error") }], + // prettier-ignore + ["async", async () => { throw new Error("another error") }], + ])("[%s] catchFn can map one error to another error", async (_, catchFn) => { + const observable = new Observable((observer) => { + observer.next(1); + observer.next(2); + setTimeout(() => { + observer.error(new Error("expected")); + // will be ignored by Observable since the observer already closed + observer.next(4); + }, 10); + }); + const mapped = asyncMap(observable, (n) => n * 2, catchFn); + const taker = new ObservableTaker(mapped); + await expect(taker.takeNext()).resolves.toBe(2); + await expect(taker.takeNext()).resolves.toBe(4); + await expect(taker.takeError()).resolves.toEqual( + new Error("another error") + ); + // even after recovery, further `.next` inside the observer will be ignored + // by the Observable itself, so asyncMap cannot do anything about that + expect(taker.take()).rejects.toMatch(/timeout/i); + }); });