Skip to content

Commit

Permalink
suggestions from code review
Browse files Browse the repository at this point in the history
thanks @jerelmiller!
  • Loading branch information
phryneas committed Oct 4, 2023
1 parent 2ba8d99 commit 86e5d2b
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 41 deletions.
2 changes: 1 addition & 1 deletion .changeset/hungry-vans-walk.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
"@apollo/client": patch
---

fixes a one-tick-race condition in `asyncMap`
Fixes a race condition in asyncMap that caused issues in React Native when errors were returned in the response payload along with a data property that was null.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { Observable } from "../../utilities/index.js";

interface PeekOptions {
interface TakeOptions {
timeout?: number;
}
type ObservableEvent<T> =
Expand All @@ -11,14 +11,14 @@ type ObservableEvent<T> =
async function* observableToAsyncEventIterator<T>(observable: Observable<T>) {
let resolveNext: (value: ObservableEvent<T>) => void;
const promises: Promise<ObservableEvent<T>>[] = [];
pushPromise();
queuePromise();

function pushPromise() {
function queuePromise() {
promises.push(
new Promise<ObservableEvent<T>>((resolve) => {
resolveNext = (event: ObservableEvent<T>) => {
resolve(event)
pushPromise();
resolve(event);
queuePromise();
};
})
);
Expand All @@ -35,10 +35,10 @@ async function* observableToAsyncEventIterator<T>(observable: Observable<T>) {
}
}

class IteratorTaker<T> {
class IteratorStream<T> {
constructor(private iterator: AsyncGenerator<T, void, unknown>) {}

async take({ timeout = 100 }: PeekOptions = {}): Promise<T> {
async take({ timeout = 100 }: TakeOptions = {}): Promise<T> {
return Promise.race([
this.iterator.next().then((result) => result.value!),
new Promise<T>((_, reject) => {
Expand All @@ -52,24 +52,24 @@ class IteratorTaker<T> {
}
}

export class ObservableTaker<T> extends IteratorTaker<ObservableEvent<T>> {
export class ObservableStream<T> extends IteratorStream<ObservableEvent<T>> {
constructor(observable: Observable<T>) {
super(observableToAsyncEventIterator(observable));
}

async takeNext(options?: PeekOptions): Promise<T> {
async takeNext(options?: TakeOptions): Promise<T> {
const event = await this.take(options);
expect(event).toEqual({ type: "next", value: expect.anything() });
return (event as ObservableEvent<T> & { type: "next" }).value;
}

async takeError(options?: PeekOptions): Promise<any> {
async takeError(options?: TakeOptions): Promise<any> {
const event = await this.take(options);
expect(event).toEqual({ type: "error", error: expect.anything() });
return (event as ObservableEvent<T> & { type: "error" }).error;
}

async takeComplete(options?: PeekOptions): Promise<void> {
async takeComplete(options?: TakeOptions): Promise<void> {
const event = await this.take(options);
expect(event).toEqual({ type: "complete" });
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { Observable } from "../../../utilities";
import { ObservableTaker } from "../ObservableTaker";
import { ObservableStream } from "../ObservableStream";

it("allows to step through an observable until completion", async () => {
const taker = new ObservableTaker(
const taker = new ObservableStream(
new Observable<number>((observer) => {
observer.next(1);
observer.next(2);
Expand All @@ -17,7 +17,7 @@ it("allows to step through an observable until completion", async () => {
});

it("allows to step through an observable until error", async () => {
const taker = new ObservableTaker(
const taker = new ObservableStream(
new Observable<number>((observer) => {
observer.next(1);
observer.next(2);
Expand All @@ -32,7 +32,7 @@ it("allows to step through an observable until error", async () => {
});

it("will time out if no more value is omitted", async () => {
const taker = new ObservableTaker(
const taker = new ObservableStream(
new Observable<number>((observer) => {
observer.next(1);
observer.next(2);
Expand All @@ -51,7 +51,7 @@ it.each([
["takeComplete", "next"],
["takeComplete", "error"],
])("errors when %s receives %s instead", async (expected, gotten) => {
const taker = new ObservableTaker(
const taker = new ObservableStream(
new Observable<number>((observer) => {
observer.next(1);
observer.next(2);
Expand All @@ -70,7 +70,7 @@ it.each([
["takeError", "error"],
["takeComplete", "complete"],
])("succeeds when %s, receives %s", async (expected, gotten) => {
const taker = new ObservableTaker(
const taker = new ObservableStream(
new Observable<number>((observer) => {
observer.next(1);
observer.next(2);
Expand Down
2 changes: 1 addition & 1 deletion src/testing/internal/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
export * from "./profile/index.js";
export * from "./disposables/index.js";
export { ObservableTaker } from "./ObservableTaker.js";
export { ObservableStream } from "./ObservableStream.js";
44 changes: 22 additions & 22 deletions src/utilities/observables/__tests__/asyncMap.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Observable } from "../Observable";
import { asyncMap } from "../asyncMap";
import { itAsync } from "../../../testing";
import { ObservableTaker } from "../../../testing/internal";
import { ObservableStream } from "../../../testing/internal";
const wait = (delayMs: number) =>
new Promise<void>((resolve) => setTimeout(resolve, delayMs));

Expand Down Expand Up @@ -158,12 +158,12 @@ describe("asyncMap", () => {
}, 10);
});
const mapped = asyncMap(observable, mapFn);
const taker = new ObservableTaker(mapped);
await expect(taker.takeNext()).resolves.toBe(2);
await expect(taker.takeNext()).resolves.toBe(4);
await expect(taker.takeNext()).resolves.toBe(6);
await expect(taker.takeNext()).resolves.toBe(8);
await taker.takeComplete();
const stream = new ObservableStream(mapped);
await expect(stream.takeNext()).resolves.toBe(2);
await expect(stream.takeNext()).resolves.toBe(4);
await expect(stream.takeNext()).resolves.toBe(6);
await expect(stream.takeNext()).resolves.toBe(8);
await stream.takeComplete();
});

test.each([["sync"], ["async"]])(
Expand Down Expand Up @@ -198,12 +198,12 @@ describe("asyncMap", () => {
return n * 2;
}
);
const taker = new ObservableTaker(mapped);
await expect(taker.takeNext()).resolves.toBe(2);
await expect(taker.takeNext()).resolves.toBe(4);
await expect(taker.takeError()).resolves.toEqual(new Error("expected"));
const stream = new ObservableStream(mapped);
await expect(stream.takeNext()).resolves.toBe(2);
await expect(stream.takeNext()).resolves.toBe(4);
await expect(stream.takeError()).resolves.toEqual(new Error("expected"));
// no more emits
expect(taker.take()).rejects.toMatch(/timeout/i);
expect(stream.take()).rejects.toMatch(/timeout/i);
// the observer was closed after the error, so we don't expect `mapFn` to
// be called for values that will not be emitted
expect(lastMapped).toBe(3);
Expand All @@ -224,13 +224,13 @@ describe("asyncMap", () => {
}, 10);
});
const mapped = asyncMap(observable, (n) => n * 2, catchFn);
const taker = new ObservableTaker(mapped);
await expect(taker.takeNext()).resolves.toBe(2);
await expect(taker.takeNext()).resolves.toBe(4);
await expect(taker.takeNext()).resolves.toBe(99);
const stream = new ObservableStream(mapped);
await expect(stream.takeNext()).resolves.toBe(2);
await expect(stream.takeNext()).resolves.toBe(4);
await expect(stream.takeNext()).resolves.toBe(99);
// even after recovery, further `.next` inside the observer will be ignored
// by the parent Observable itself, so asyncMap cannot do anything about that
expect(taker.take()).rejects.toMatch(/timeout/i);
expect(stream.take()).rejects.toMatch(/timeout/i);
});

test.each([
Expand All @@ -249,14 +249,14 @@ describe("asyncMap", () => {
}, 10);
});
const mapped = asyncMap(observable, (n) => n * 2, catchFn);
const taker = new ObservableTaker(mapped);
await expect(taker.takeNext()).resolves.toBe(2);
await expect(taker.takeNext()).resolves.toBe(4);
await expect(taker.takeError()).resolves.toEqual(
const stream = new ObservableStream(mapped);
await expect(stream.takeNext()).resolves.toBe(2);
await expect(stream.takeNext()).resolves.toBe(4);
await expect(stream.takeError()).resolves.toEqual(
new Error("another error")
);
// even after recovery, further `.next` inside the observer will be ignored
// by the Observable itself, so asyncMap cannot do anything about that
expect(taker.take()).rejects.toMatch(/timeout/i);
expect(stream.take()).rejects.toMatch(/timeout/i);
});
});

0 comments on commit 86e5d2b

Please sign in to comment.