From 86e5d2b166cd6319facd9e2f5903b37be35d73c7 Mon Sep 17 00:00:00 2001 From: Lenz Weber-Tronic Date: Wed, 4 Oct 2023 12:16:46 +0200 Subject: [PATCH] suggestions from code review thanks @jerelmiller! --- .changeset/hungry-vans-walk.md | 2 +- ...ObservableTaker.ts => ObservableStream.ts} | 22 +++++----- ...Taker.test.ts => ObservableStream.test.ts} | 12 ++--- src/testing/internal/index.ts | 2 +- .../observables/__tests__/asyncMap.ts | 44 +++++++++---------- 5 files changed, 41 insertions(+), 41 deletions(-) rename src/testing/internal/{ObservableTaker.ts => ObservableStream.ts} (78%) rename src/testing/internal/__tests__/{ObservableTaker.test.ts => ObservableStream.test.ts} (90%) diff --git a/.changeset/hungry-vans-walk.md b/.changeset/hungry-vans-walk.md index 51d96c2f502..733ff354579 100644 --- a/.changeset/hungry-vans-walk.md +++ b/.changeset/hungry-vans-walk.md @@ -2,4 +2,4 @@ "@apollo/client": patch --- -fixes a one-tick-race condition in `asyncMap` +Fixes a race condition in asyncMap that caused issues in React Native when errors were returned in the response payload along with a data property that was null. diff --git a/src/testing/internal/ObservableTaker.ts b/src/testing/internal/ObservableStream.ts similarity index 78% rename from src/testing/internal/ObservableTaker.ts rename to src/testing/internal/ObservableStream.ts index 5a42dd8ca40..f0416692331 100644 --- a/src/testing/internal/ObservableTaker.ts +++ b/src/testing/internal/ObservableStream.ts @@ -1,6 +1,6 @@ import type { Observable } from "../../utilities/index.js"; -interface PeekOptions { +interface TakeOptions { timeout?: number; } type ObservableEvent = @@ -11,14 +11,14 @@ type ObservableEvent = async function* observableToAsyncEventIterator(observable: Observable) { let resolveNext: (value: ObservableEvent) => void; const promises: Promise>[] = []; - pushPromise(); + queuePromise(); - function pushPromise() { + function queuePromise() { promises.push( new Promise>((resolve) => { resolveNext = (event: ObservableEvent) => { - resolve(event) - pushPromise(); + resolve(event); + queuePromise(); }; }) ); @@ -35,10 +35,10 @@ async function* observableToAsyncEventIterator(observable: Observable) { } } -class IteratorTaker { +class IteratorStream { constructor(private iterator: AsyncGenerator) {} - async take({ timeout = 100 }: PeekOptions = {}): Promise { + async take({ timeout = 100 }: TakeOptions = {}): Promise { return Promise.race([ this.iterator.next().then((result) => result.value!), new Promise((_, reject) => { @@ -52,24 +52,24 @@ class IteratorTaker { } } -export class ObservableTaker extends IteratorTaker> { +export class ObservableStream extends IteratorStream> { constructor(observable: Observable) { super(observableToAsyncEventIterator(observable)); } - async takeNext(options?: PeekOptions): Promise { + async takeNext(options?: TakeOptions): Promise { const event = await this.take(options); expect(event).toEqual({ type: "next", value: expect.anything() }); return (event as ObservableEvent & { type: "next" }).value; } - async takeError(options?: PeekOptions): Promise { + async takeError(options?: TakeOptions): Promise { const event = await this.take(options); expect(event).toEqual({ type: "error", error: expect.anything() }); return (event as ObservableEvent & { type: "error" }).error; } - async takeComplete(options?: PeekOptions): Promise { + async takeComplete(options?: TakeOptions): Promise { const event = await this.take(options); expect(event).toEqual({ type: "complete" }); } diff --git a/src/testing/internal/__tests__/ObservableTaker.test.ts b/src/testing/internal/__tests__/ObservableStream.test.ts similarity index 90% rename from src/testing/internal/__tests__/ObservableTaker.test.ts rename to src/testing/internal/__tests__/ObservableStream.test.ts index dbabe8b38cd..4d499f07a06 100644 --- a/src/testing/internal/__tests__/ObservableTaker.test.ts +++ b/src/testing/internal/__tests__/ObservableStream.test.ts @@ -1,8 +1,8 @@ import { Observable } from "../../../utilities"; -import { ObservableTaker } from "../ObservableTaker"; +import { ObservableStream } from "../ObservableStream"; it("allows to step through an observable until completion", async () => { - const taker = new ObservableTaker( + const taker = new ObservableStream( new Observable((observer) => { observer.next(1); observer.next(2); @@ -17,7 +17,7 @@ it("allows to step through an observable until completion", async () => { }); it("allows to step through an observable until error", async () => { - const taker = new ObservableTaker( + const taker = new ObservableStream( new Observable((observer) => { observer.next(1); observer.next(2); @@ -32,7 +32,7 @@ it("allows to step through an observable until error", async () => { }); it("will time out if no more value is omitted", async () => { - const taker = new ObservableTaker( + const taker = new ObservableStream( new Observable((observer) => { observer.next(1); observer.next(2); @@ -51,7 +51,7 @@ it.each([ ["takeComplete", "next"], ["takeComplete", "error"], ])("errors when %s receives %s instead", async (expected, gotten) => { - const taker = new ObservableTaker( + const taker = new ObservableStream( new Observable((observer) => { observer.next(1); observer.next(2); @@ -70,7 +70,7 @@ it.each([ ["takeError", "error"], ["takeComplete", "complete"], ])("succeeds when %s, receives %s", async (expected, gotten) => { - const taker = new ObservableTaker( + const taker = new ObservableStream( new Observable((observer) => { observer.next(1); observer.next(2); diff --git a/src/testing/internal/index.ts b/src/testing/internal/index.ts index 8bcb77e75e0..73a9a00ff0e 100644 --- a/src/testing/internal/index.ts +++ b/src/testing/internal/index.ts @@ -1,3 +1,3 @@ export * from "./profile/index.js"; export * from "./disposables/index.js"; -export { ObservableTaker } from "./ObservableTaker.js"; +export { ObservableStream } from "./ObservableStream.js"; diff --git a/src/utilities/observables/__tests__/asyncMap.ts b/src/utilities/observables/__tests__/asyncMap.ts index a3c2c40a582..e5121b2821d 100644 --- a/src/utilities/observables/__tests__/asyncMap.ts +++ b/src/utilities/observables/__tests__/asyncMap.ts @@ -1,7 +1,7 @@ import { Observable } from "../Observable"; import { asyncMap } from "../asyncMap"; import { itAsync } from "../../../testing"; -import { ObservableTaker } from "../../../testing/internal"; +import { ObservableStream } from "../../../testing/internal"; const wait = (delayMs: number) => new Promise((resolve) => setTimeout(resolve, delayMs)); @@ -158,12 +158,12 @@ describe("asyncMap", () => { }, 10); }); const mapped = asyncMap(observable, mapFn); - const taker = new ObservableTaker(mapped); - await expect(taker.takeNext()).resolves.toBe(2); - await expect(taker.takeNext()).resolves.toBe(4); - await expect(taker.takeNext()).resolves.toBe(6); - await expect(taker.takeNext()).resolves.toBe(8); - await taker.takeComplete(); + const stream = new ObservableStream(mapped); + await expect(stream.takeNext()).resolves.toBe(2); + await expect(stream.takeNext()).resolves.toBe(4); + await expect(stream.takeNext()).resolves.toBe(6); + await expect(stream.takeNext()).resolves.toBe(8); + await stream.takeComplete(); }); test.each([["sync"], ["async"]])( @@ -198,12 +198,12 @@ describe("asyncMap", () => { return n * 2; } ); - const taker = new ObservableTaker(mapped); - await expect(taker.takeNext()).resolves.toBe(2); - await expect(taker.takeNext()).resolves.toBe(4); - await expect(taker.takeError()).resolves.toEqual(new Error("expected")); + const stream = new ObservableStream(mapped); + await expect(stream.takeNext()).resolves.toBe(2); + await expect(stream.takeNext()).resolves.toBe(4); + await expect(stream.takeError()).resolves.toEqual(new Error("expected")); // no more emits - expect(taker.take()).rejects.toMatch(/timeout/i); + expect(stream.take()).rejects.toMatch(/timeout/i); // the observer was closed after the error, so we don't expect `mapFn` to // be called for values that will not be emitted expect(lastMapped).toBe(3); @@ -224,13 +224,13 @@ describe("asyncMap", () => { }, 10); }); const mapped = asyncMap(observable, (n) => n * 2, catchFn); - const taker = new ObservableTaker(mapped); - await expect(taker.takeNext()).resolves.toBe(2); - await expect(taker.takeNext()).resolves.toBe(4); - await expect(taker.takeNext()).resolves.toBe(99); + const stream = new ObservableStream(mapped); + await expect(stream.takeNext()).resolves.toBe(2); + await expect(stream.takeNext()).resolves.toBe(4); + await expect(stream.takeNext()).resolves.toBe(99); // even after recovery, further `.next` inside the observer will be ignored // by the parent Observable itself, so asyncMap cannot do anything about that - expect(taker.take()).rejects.toMatch(/timeout/i); + expect(stream.take()).rejects.toMatch(/timeout/i); }); test.each([ @@ -249,14 +249,14 @@ describe("asyncMap", () => { }, 10); }); const mapped = asyncMap(observable, (n) => n * 2, catchFn); - const taker = new ObservableTaker(mapped); - await expect(taker.takeNext()).resolves.toBe(2); - await expect(taker.takeNext()).resolves.toBe(4); - await expect(taker.takeError()).resolves.toEqual( + const stream = new ObservableStream(mapped); + await expect(stream.takeNext()).resolves.toBe(2); + await expect(stream.takeNext()).resolves.toBe(4); + await expect(stream.takeError()).resolves.toEqual( new Error("another error") ); // even after recovery, further `.next` inside the observer will be ignored // by the Observable itself, so asyncMap cannot do anything about that - expect(taker.take()).rejects.toMatch(/timeout/i); + expect(stream.take()).rejects.toMatch(/timeout/i); }); });