diff --git a/.changeset/hungry-vans-walk.md b/.changeset/hungry-vans-walk.md
new file mode 100644
index 00000000000..733ff354579
--- /dev/null
+++ b/.changeset/hungry-vans-walk.md
@@ -0,0 +1,5 @@
+---
+"@apollo/client": patch
+---
+
+Fixes a race condition in asyncMap that caused issues in React Native when errors were returned in the response payload along with a data property that was null.
diff --git a/.size-limit.cjs b/.size-limit.cjs
index a1d0fadb3da..b3ed1cc372c 100644
--- a/.size-limit.cjs
+++ b/.size-limit.cjs
@@ -1,7 +1,7 @@
const checks = [
{
path: "dist/apollo-client.min.cjs",
- limit: "37986",
+ limit: "37914",
},
{
path: "dist/main.cjs",
@@ -10,7 +10,7 @@ const checks = [
{
path: "dist/index.js",
import: "{ ApolloClient, InMemoryCache, HttpLink }",
- limit: "32019",
+ limit: "31947",
},
...[
"ApolloProvider",
diff --git a/docs/source/api/link/persisted-queries.mdx b/docs/source/api/link/persisted-queries.mdx
index e20cf6f5f7c..e2b4a35da86 100644
--- a/docs/source/api/link/persisted-queries.mdx
+++ b/docs/source/api/link/persisted-queries.mdx
@@ -21,13 +21,13 @@ Malicious actors can exploit GraphQL APIs by sending large and complex requests
## Implementation steps
-Because persisted queries requires you to preregister operations, it has additional implementation steps.
+Because persisted queries requires you to preregister operations, it has additional implementation steps.
## 0. Requirements
-Persisted queries is currently in [preview](/resources/product-launch-stages#preview) and has the following requirements:
+Using persisted queries for safelisting has the following requirements:
- Apollo Client Web (v3.7.0+)
- The [`@apollo/generate-persisted-query-manifest` package](https://www.npmjs.com/package/@apollo/generate-persisted-query-manifest)
- The [`@apollo/persisted-query-lists` package](https://www.npmjs.com/package/@apollo/persisted-query-lists)
diff --git a/src/testing/internal/ObservableStream.ts b/src/testing/internal/ObservableStream.ts
new file mode 100644
index 00000000000..f0416692331
--- /dev/null
+++ b/src/testing/internal/ObservableStream.ts
@@ -0,0 +1,76 @@
+import type { Observable } from "../../utilities/index.js";
+
+interface TakeOptions {
+ timeout?: number;
+}
+type ObservableEvent =
+ | { type: "next"; value: T }
+ | { type: "error"; error: any }
+ | { type: "complete" };
+
+async function* observableToAsyncEventIterator(observable: Observable) {
+ let resolveNext: (value: ObservableEvent) => void;
+ const promises: Promise>[] = [];
+ queuePromise();
+
+ function queuePromise() {
+ promises.push(
+ new Promise>((resolve) => {
+ resolveNext = (event: ObservableEvent) => {
+ resolve(event);
+ queuePromise();
+ };
+ })
+ );
+ }
+
+ observable.subscribe(
+ (value) => resolveNext({ type: "next", value }),
+ (error) => resolveNext({ type: "error", error }),
+ () => resolveNext({ type: "complete" })
+ );
+
+ while (true) {
+ yield promises.shift()!;
+ }
+}
+
+class IteratorStream {
+ constructor(private iterator: AsyncGenerator) {}
+
+ async take({ timeout = 100 }: TakeOptions = {}): Promise {
+ return Promise.race([
+ this.iterator.next().then((result) => result.value!),
+ new Promise((_, reject) => {
+ setTimeout(
+ reject,
+ timeout,
+ new Error("Timeout waiting for next event")
+ );
+ }),
+ ]);
+ }
+}
+
+export class ObservableStream extends IteratorStream> {
+ constructor(observable: Observable) {
+ super(observableToAsyncEventIterator(observable));
+ }
+
+ async takeNext(options?: TakeOptions): Promise {
+ const event = await this.take(options);
+ expect(event).toEqual({ type: "next", value: expect.anything() });
+ return (event as ObservableEvent & { type: "next" }).value;
+ }
+
+ async takeError(options?: TakeOptions): Promise {
+ const event = await this.take(options);
+ expect(event).toEqual({ type: "error", error: expect.anything() });
+ return (event as ObservableEvent & { type: "error" }).error;
+ }
+
+ async takeComplete(options?: TakeOptions): Promise {
+ const event = await this.take(options);
+ expect(event).toEqual({ type: "complete" });
+ }
+}
diff --git a/src/testing/internal/__tests__/ObservableStream.test.ts b/src/testing/internal/__tests__/ObservableStream.test.ts
new file mode 100644
index 00000000000..b2441d30972
--- /dev/null
+++ b/src/testing/internal/__tests__/ObservableStream.test.ts
@@ -0,0 +1,85 @@
+import { Observable } from "../../../utilities";
+import { ObservableStream } from "../ObservableStream";
+
+it("allows to step through an observable until completion", async () => {
+ const stream = new ObservableStream(
+ new Observable((observer) => {
+ observer.next(1);
+ observer.next(2);
+ observer.next(3);
+ observer.complete();
+ })
+ );
+ await expect(stream.takeNext()).resolves.toBe(1);
+ await expect(stream.takeNext()).resolves.toBe(2);
+ await expect(stream.takeNext()).resolves.toBe(3);
+ await expect(stream.takeComplete()).resolves.toBeUndefined();
+});
+
+it("allows to step through an observable until error", async () => {
+ const stream = new ObservableStream(
+ new Observable((observer) => {
+ observer.next(1);
+ observer.next(2);
+ observer.next(3);
+ observer.error(new Error("expected"));
+ })
+ );
+ await expect(stream.takeNext()).resolves.toBe(1);
+ await expect(stream.takeNext()).resolves.toBe(2);
+ await expect(stream.takeNext()).resolves.toBe(3);
+ await expect(stream.takeError()).resolves.toEqual(expect.any(Error));
+});
+
+it("will time out if no more value is omitted", async () => {
+ const stream = new ObservableStream(
+ new Observable((observer) => {
+ observer.next(1);
+ observer.next(2);
+ })
+ );
+ await expect(stream.takeNext()).resolves.toBe(1);
+ await expect(stream.takeNext()).resolves.toBe(2);
+ await expect(stream.takeNext()).rejects.toEqual(expect.any(Error));
+});
+
+it.each([
+ ["takeNext", "complete"],
+ ["takeNext", "error"],
+ ["takeError", "complete"],
+ ["takeError", "next"],
+ ["takeComplete", "next"],
+ ["takeComplete", "error"],
+])("errors when %s receives %s instead", async (expected, gotten) => {
+ const stream = new ObservableStream(
+ new Observable((observer) => {
+ observer.next(1);
+ observer.next(2);
+ // @ts-ignore
+ observer[gotten](3);
+ })
+ );
+ await expect(stream.takeNext()).resolves.toBe(1);
+ await expect(stream.takeNext()).resolves.toBe(2);
+ // @ts-ignore
+ await expect(stream[expected]()).rejects.toEqual(expect.any(Error));
+});
+
+it.each([
+ ["takeNext", "next"],
+ ["takeError", "error"],
+ ["takeComplete", "complete"],
+])("succeeds when %s, receives %s", async (expected, gotten) => {
+ const stream = new ObservableStream(
+ new Observable((observer) => {
+ observer.next(1);
+ observer.next(2);
+ // @ts-ignore
+ observer[gotten](3);
+ })
+ );
+ await expect(stream.takeNext()).resolves.toBe(1);
+ await expect(stream.takeNext()).resolves.toBe(2);
+ // @ts-ignore this should just not throw
+ await stream[expected]();
+});
diff --git a/src/testing/internal/index.ts b/src/testing/internal/index.ts
index 4dd162e2ca9..73a9a00ff0e 100644
--- a/src/testing/internal/index.ts
+++ b/src/testing/internal/index.ts
@@ -1,2 +1,3 @@
export * from "./profile/index.js";
export * from "./disposables/index.js";
+export { ObservableStream } from "./ObservableStream.js";
diff --git a/src/utilities/observables/__tests__/asyncMap.ts b/src/utilities/observables/__tests__/asyncMap.ts
index d4a96bb0b23..e54b2140e66 100644
--- a/src/utilities/observables/__tests__/asyncMap.ts
+++ b/src/utilities/observables/__tests__/asyncMap.ts
@@ -1,7 +1,7 @@
import { Observable } from "../Observable";
import { asyncMap } from "../asyncMap";
import { itAsync } from "../../../testing";
-
+import { ObservableStream } from "../../../testing/internal";
const wait = (delayMs: number) =>
new Promise((resolve) => setTimeout(resolve, delayMs));
@@ -141,4 +141,125 @@ describe("asyncMap", () => {
}),
});
});
+
+ test.each([
+ ["sync", (n: number) => n * 2],
+ ["async", async (n: number) => n * 2],
+ ])("[%s] mapFn maps over values", async (_, mapFn) => {
+ const observable = new Observable((observer) => {
+ observer.next(1);
+ observer.next(2);
+ setTimeout(() => {
+ observer.next(3);
+ setTimeout(() => {
+ observer.next(4);
+ observer.complete();
+ }, 10);
+ }, 10);
+ });
+ const mapped = asyncMap(observable, mapFn);
+ const stream = new ObservableStream(mapped);
+ await expect(stream.takeNext()).resolves.toBe(2);
+ await expect(stream.takeNext()).resolves.toBe(4);
+ await expect(stream.takeNext()).resolves.toBe(6);
+ await expect(stream.takeNext()).resolves.toBe(8);
+ await stream.takeComplete();
+ });
+
+ test.each([["sync"], ["async"]])(
+ "[%s] mapFn notifies the observer with an error when an error is thrown inside the mapFn",
+ async (synchronity) => {
+ const observable = new Observable((observer) => {
+ observer.next(1);
+ observer.next(2);
+ setTimeout(() => {
+ // this will throw
+ observer.next(3);
+ // this will be swallowed and also not call `mapFn` anymore
+ observer.next(4);
+ setTimeout(() => {
+ observer.next(5);
+ observer.complete();
+ }, 10);
+ }, 10);
+ });
+ let lastMapped = 0;
+ const mapped = asyncMap(
+ observable,
+ synchronity === "sync"
+ ? (n: number) => {
+ lastMapped = n;
+ if (n === 3) throw new Error("expected");
+ return n * 2;
+ }
+ : async (n: number) => {
+ lastMapped = n;
+ if (n === 3) throw new Error("expected");
+ return n * 2;
+ }
+ );
+ const stream = new ObservableStream(mapped);
+ await expect(stream.takeNext()).resolves.toBe(2);
+ await expect(stream.takeNext()).resolves.toBe(4);
+ await expect(stream.takeError()).resolves.toEqual(new Error("expected"));
+ // no more emits
+ expect(stream.take()).rejects.toMatch(/timeout/i);
+ // the observer was closed after the error, so we don't expect `mapFn` to
+ // be called for values that will not be emitted
+ expect(lastMapped).toBe(3);
+ }
+ );
+
+ test.each([
+ ["sync", () => 99],
+ ["async", async () => 99],
+ ])(
+ "[%s] catchFn notifies the observer with a value when `catchFn` returns a value instead of re-throwing",
+ async (_, catchFn) => {
+ const observable = new Observable((observer) => {
+ observer.next(1);
+ observer.next(2);
+ setTimeout(() => {
+ observer.error(new Error("expected"));
+ // will be ignored by parent Observable since the observer already closed
+ observer.next(4);
+ }, 10);
+ });
+ const mapped = asyncMap(observable, (n) => n * 2, catchFn);
+ const stream = new ObservableStream(mapped);
+ await expect(stream.takeNext()).resolves.toBe(2);
+ await expect(stream.takeNext()).resolves.toBe(4);
+ await expect(stream.takeNext()).resolves.toBe(99);
+ // even after recovery, further `.next` inside the observer will be ignored
+ // by the parent Observable itself, so asyncMap cannot do anything about that
+ expect(stream.take()).rejects.toMatch(/timeout/i);
+ }
+ );
+
+ test.each([
+ // prettier-ignore
+ ["sync", () => { throw new Error("another error") }],
+ // prettier-ignore
+ ["async", async () => { throw new Error("another error") }],
+ ])("[%s] catchFn can map one error to another error", async (_, catchFn) => {
+ const observable = new Observable((observer) => {
+ observer.next(1);
+ observer.next(2);
+ setTimeout(() => {
+ observer.error(new Error("expected"));
+ // will be ignored by Observable since the observer already closed
+ observer.next(4);
+ }, 10);
+ });
+ const mapped = asyncMap(observable, (n) => n * 2, catchFn);
+ const stream = new ObservableStream(mapped);
+ await expect(stream.takeNext()).resolves.toBe(2);
+ await expect(stream.takeNext()).resolves.toBe(4);
+ await expect(stream.takeError()).resolves.toEqual(
+ new Error("another error")
+ );
+ // even after recovery, further `.next` inside the observer will be ignored
+ // by the Observable itself, so asyncMap cannot do anything about that
+ expect(stream.take()).rejects.toMatch(/timeout/i);
+ });
});
diff --git a/src/utilities/observables/asyncMap.ts b/src/utilities/observables/asyncMap.ts
index 5f77ffa1143..5bf24350a5e 100644
--- a/src/utilities/observables/asyncMap.ts
+++ b/src/utilities/observables/asyncMap.ts
@@ -9,9 +9,6 @@ export function asyncMap(
catchFn?: (error: any) => R | PromiseLike
): Observable {
return new Observable((observer) => {
- const { next, error, complete } = observer;
- let activeCallbackCount = 0;
- let completed = false;
let promiseQueue = {
// Normally we would initialize promiseQueue to Promise.resolve(), but
// in this case, for backwards compatibility, we need to be careful to
@@ -23,44 +20,34 @@ export function asyncMap(
function makeCallback(
examiner: typeof mapFn | typeof catchFn,
- delegate: typeof next | typeof error
+ key: "next" | "error"
): (arg: any) => void {
- if (examiner) {
- return (arg) => {
- ++activeCallbackCount;
- const both = () => examiner(arg);
- promiseQueue = promiseQueue
- .then(both, both)
- .then(
- (result) => {
- --activeCallbackCount;
- next && next.call(observer, result);
- if (completed) {
- handler.complete!();
- }
- },
- (error) => {
- --activeCallbackCount;
- throw error;
- }
- )
- .catch((caught) => {
- error && error.call(observer, caught);
- });
- };
- } else {
- return (arg) => delegate && delegate.call(observer, arg);
- }
+ return (arg) => {
+ if (examiner) {
+ const both = () =>
+ // If the observer is closed, we don't want to continue calling the
+ // mapping function - it's result will be swallowed anyways.
+ observer.closed
+ ? /* will be swallowed */ (0 as any)
+ : examiner(arg);
+
+ promiseQueue = promiseQueue.then(both, both).then(
+ (result) => observer.next(result),
+ (error) => observer.error(error)
+ );
+ } else {
+ observer[key](arg);
+ }
+ };
}
const handler: Observer = {
- next: makeCallback(mapFn, next),
- error: makeCallback(catchFn, error),
+ next: makeCallback(mapFn, "next"),
+ error: makeCallback(catchFn, "error"),
complete() {
- completed = true;
- if (!activeCallbackCount) {
- complete && complete.call(observer);
- }
+ // no need to reassign `promiseQueue`, after `observer.complete`,
+ // the observer will be closed and short-circuit everything anyways
+ /*promiseQueue = */ promiseQueue.then(() => observer.complete());
},
};