From 6e69dfcf50c62a513a537cb8bdbb323e6d27eb38 Mon Sep 17 00:00:00 2001 From: Lenz Weber-Tronic Date: Mon, 4 Mar 2024 15:23:48 +0100 Subject: [PATCH 01/14] switch data transport to event stream --- .../DataTransportAbstraction.ts | 43 ++++- .../WrapApolloProvider.tsx | 10 +- .../WrappedApolloClient.tsx | 156 +++++++++++++----- .../WrappedInMemoryCache.tsx | 44 ++--- .../src/DataTransportAbstraction/index.ts | 5 +- .../ApolloRehydrateSymbols.tsx | 2 + .../ManualDataTransport.tsx | 46 ++++-- .../RehydrationContext.tsx | 22 +-- .../dataTransport.ts | 18 +- .../ExperimentalManualDataTransport/types.tsx | 8 +- packages/client-react-streaming/src/index.ts | 1 + 11 files changed, 216 insertions(+), 139 deletions(-) diff --git a/packages/client-react-streaming/src/DataTransportAbstraction/DataTransportAbstraction.ts b/packages/client-react-streaming/src/DataTransportAbstraction/DataTransportAbstraction.ts index be9f6cc0..7dae8092 100644 --- a/packages/client-react-streaming/src/DataTransportAbstraction/DataTransportAbstraction.ts +++ b/packages/client-react-streaming/src/DataTransportAbstraction/DataTransportAbstraction.ts @@ -1,5 +1,9 @@ import type React from "react"; -import type { Cache, WatchQueryOptions } from "@apollo/client/index.js"; +import type { + FetchResult, + Observable, + WatchQueryOptions, +} from "@apollo/client/index.js"; import { createContext } from "react"; interface DataTransportAbstraction { @@ -20,20 +24,41 @@ export type DataTransportProviderImplementation< > = React.FC< { /** will be present in the Browser */ - onRequestStarted?: (options: WatchQueryOptions) => void; - /** will be present in the Browser */ - onRequestData?: (options: Cache.WriteOptions) => void; + onQueryEvent?: (event: QueryEvent) => void; /** will be present in the Browser */ rerunSimulatedQueries?: () => void; /** will be present during SSR */ registerDispatchRequestStarted?: ( - callback: (options: WatchQueryOptions) => void - ) => void; - /** will be present during SSR */ - registerDispatchRequestData?: ( - callback: (options: Cache.WriteOptions) => void + callback: (query: { + options: WatchQueryOptions; + observable: Observable>; + id: TransportIdentifier; + }) => void ) => void; /** will always be present */ children: React.ReactNode; } & ExtraProps >; + +export type TransportIdentifier = string & { __transportIdentifier: true }; + +export type QueryEvent = + | { + type: "started"; + options: WatchQueryOptions; + id: TransportIdentifier; + } + | { + type: "data"; + id: TransportIdentifier; + result: FetchResult; + } + | { + type: "error"; + id: TransportIdentifier; + error: Error; + } + | { + type: "complete"; + id: TransportIdentifier; + }; diff --git a/packages/client-react-streaming/src/DataTransportAbstraction/WrapApolloProvider.tsx b/packages/client-react-streaming/src/DataTransportAbstraction/WrapApolloProvider.tsx index 1de9366c..0a033bff 100644 --- a/packages/client-react-streaming/src/DataTransportAbstraction/WrapApolloProvider.tsx +++ b/packages/client-react-streaming/src/DataTransportAbstraction/WrapApolloProvider.tsx @@ -47,12 +47,12 @@ export function WrapApolloProvider( return ( + event.type === "started" + ? clientRef.current!.onQueryStarted!(event) + : clientRef.current?.onQueryProgress!(event) } + rerunSimulatedQueries={clientRef.current.rerunSimulatedQueries} registerDispatchRequestStarted={ clientRef.current.watchQueryQueue?.register } diff --git a/packages/client-react-streaming/src/DataTransportAbstraction/WrappedApolloClient.tsx b/packages/client-react-streaming/src/DataTransportAbstraction/WrappedApolloClient.tsx index e3712b52..4281fa08 100644 --- a/packages/client-react-streaming/src/DataTransportAbstraction/WrappedApolloClient.tsx +++ b/packages/client-react-streaming/src/DataTransportAbstraction/WrappedApolloClient.tsx @@ -1,3 +1,4 @@ +/* eslint-disable prefer-rest-params */ import type { ApolloClientOptions, OperationVariables, @@ -18,6 +19,11 @@ import { createBackpressuredCallback } from "./backpressuredCallback.js"; import { InMemoryCache } from "./WrappedInMemoryCache.js"; import { hookWrappers } from "./hooks.js"; import type { HookWrappers } from "@apollo/client/react/internal/index.js"; +import type { QueryInfo } from "@apollo/client/core/QueryInfo.js"; +import type { + QueryEvent, + TransportIdentifier, +} from "./DataTransportAbstraction.js"; function getQueryManager( client: OrigApolloClient @@ -49,7 +55,11 @@ class ApolloClientBase extends OrigApolloClient { } class ApolloClientSSRImpl extends ApolloClientBase { - watchQueryQueue = createBackpressuredCallback>(); + watchQueryQueue = createBackpressuredCallback<{ + options: WatchQueryOptions; + observable: Observable; + id: TransportIdentifier; + }>(); watchQuery< T = any, @@ -59,7 +69,32 @@ class ApolloClientSSRImpl extends ApolloClientBase { options.fetchPolicy !== "cache-only" && options.fetchPolicy !== "standby" ) { - this.watchQueryQueue.push(options); + const observableQuery = super.watchQuery(options); + const queryInfo = observableQuery["queryInfo"] as QueryInfo; + const streamObservable = new Observable>( + (subscriber) => { + const { markResult, markError, markReady } = queryInfo; + queryInfo.markResult = function (result) { + subscriber.next(result); + return markResult.apply(queryInfo, arguments as any); + }; + queryInfo.markError = function (error) { + subscriber.error(error); + return markError.apply(queryInfo, arguments as any); + }; + queryInfo.markReady = function () { + subscriber.complete(); + return markReady.apply(queryInfo, arguments as any); + }; + } + ); + + this.watchQueryQueue.push({ + options, + observable: streamObservable, + id: queryInfo.queryId as TransportIdentifier, + }); + return observableQuery; } return super.watchQuery(options); } @@ -68,7 +103,14 @@ class ApolloClientSSRImpl extends ApolloClientBase { export class ApolloClientBrowserImpl< TCacheShape, > extends ApolloClientBase { - private simulatedStreamingQueries = new Map(); + private simulatedStreamingQueries = new Map< + TransportIdentifier, + SimulatedQueryInfo + >(); + private transportedQueryOptions = new Map< + TransportIdentifier, + WatchQueryOptions + >(); private identifyUniqueQuery(options: { query: DocumentNode; @@ -93,8 +135,12 @@ export class ApolloClientBrowserImpl< return { query: serverQuery, cacheKey, varJson: canonicalVariables }; } - protected onRequestStarted = (options: WatchQueryOptions) => { + protected onQueryStarted = ({ + options, + id, + }: Extract) => { const { query, varJson, cacheKey } = this.identifyUniqueQuery(options); + this.transportedQueryOptions.set(id, options); if (!query) return; const printedServerQuery = print(query); @@ -116,16 +162,13 @@ export class ApolloClientBrowserImpl< varJson ); - if ( - this.simulatedStreamingQueries.get(cacheKey) === - simulatedStreamingQuery - ) - this.simulatedStreamingQueries.delete(cacheKey); + if (this.simulatedStreamingQueries.get(id) === simulatedStreamingQuery) + this.simulatedStreamingQueries.delete(id); }; const promise = new Promise((resolve, reject) => { this.simulatedStreamingQueries.set( - cacheKey, + id, (simulatedStreamingQuery = { resolve, reject, options }) ); }); @@ -151,7 +194,7 @@ export class ApolloClientBrowserImpl< queryManager["fetchCancelFns"].set( cacheKey, (fetchCancelFn = (reason: unknown) => { - const { reject } = this.simulatedStreamingQueries.get(cacheKey) ?? {}; + const { reject } = this.simulatedStreamingQueries.get(id) ?? {}; if (reject) { reject(reason); } @@ -161,20 +204,46 @@ export class ApolloClientBrowserImpl< } }; - protected onRequestData = (data: Cache.WriteOptions) => { - const { cacheKey } = this.identifyUniqueQuery(data); - const { resolve } = this.simulatedStreamingQueries.get(cacheKey) ?? {}; + protected onQueryProgress = ( + event: Exclude + ) => { + const queryInfo = this.simulatedStreamingQueries.get(event.id); - if (resolve) { - resolve({ - data: data.result, + if (event.type === "data") { + queryInfo?.resolve?.({ + data: event.result, }); + + // In order to avoid a scenario where the promise resolves without + // a query subscribing to the promise, we immediately call + // `cache.write` here. + // For more information, see: https://github.com/apollographql/apollo-client-nextjs/pull/38/files/388813a16e2ac5c62408923a1face9ae9417d92a#r1229870523 + const options = this.transportedQueryOptions.get(event.id); + if (options) { + this.cache.writeQuery({ + query: options.query, + data: event.result.data, + variables: options.variables, + }); + } + } else if (event.type === "error") { + /** + * At this point we're not able to correctly serialize the error over the wire + * so we do the next-best thing: restart the query in the browser as soon as it + * failed on the server. + * See https://github.com/apollographql/apollo-client-nextjs/issues/52 + */ + if (queryInfo) { + invariant.debug( + "query failed on server, rerunning in browser:", + queryInfo.options + ); + this.rerunSimulatedQuery(queryInfo); + } + this.transportedQueryOptions.delete(event.id); + } else if (event.type === "complete") { + this.transportedQueryOptions.delete(event.id); } - // In order to avoid a scenario where the promise resolves without - // a query subscribing to the promise, we immediately call - // `cache.write` here. - // For more information, see: https://github.com/apollographql/apollo-client-nextjs/pull/38/files/388813a16e2ac5c62408923a1face9ae9417d92a#r1229870523 - this.cache.write(data); }; /** @@ -183,36 +252,45 @@ export class ApolloClientBrowserImpl< * Those queries will be cancelled and then re-run in the browser. */ protected rerunSimulatedQueries = () => { - const queryManager = getQueryManager(this); - for (const [cacheKey, queryInfo] of this.simulatedStreamingQueries) { - this.simulatedStreamingQueries.delete(cacheKey); + for (const [id, queryInfo] of this.simulatedStreamingQueries) { + this.simulatedStreamingQueries.delete(id); invariant.debug( "streaming connection closed before server query could be fully transported, rerunning:", queryInfo.options ); - const queryId = queryManager.generateQueryId(); - queryManager - .fetchQuery(queryId, { - ...queryInfo.options, - context: { - ...queryInfo.options.context, - queryDeduplication: false, - }, - }) - .finally(() => queryManager.stopQuery(queryId)) - .then(queryInfo.resolve, queryInfo.reject); + this.rerunSimulatedQuery(queryInfo); } }; + protected rerunSimulatedQuery = (queryInfo: SimulatedQueryInfo) => { + const queryManager = getQueryManager(this); + const queryId = queryManager.generateQueryId(); + queryManager + .fetchQuery(queryId, { + ...queryInfo.options, + context: { + ...queryInfo.options.context, + queryDeduplication: false, + }, + }) + .finally(() => queryManager.stopQuery(queryId)) + .then(queryInfo.resolve, queryInfo.reject); + }; } export type ApolloClient = OrigApolloClient & { - onRequestStarted?: ApolloClientBrowserImpl["onRequestStarted"]; - onRequestData?: ApolloClientBrowserImpl["onRequestData"]; + onQueryStarted?: ApolloClientBrowserImpl["onQueryStarted"]; + onQueryProgress?: ApolloClientBrowserImpl["onQueryProgress"]; rerunSimulatedQueries?: ApolloClientBrowserImpl["rerunSimulatedQueries"]; watchQueryQueue: { register?: ( - instance: ((options: Cache.WriteOptions) => void) | null + instance: + | ((_: { + options: Cache.WriteOptions; + observable: Observable>; + id: TransportIdentifier; + }) => void) + | null ) => void; }; diff --git a/packages/client-react-streaming/src/DataTransportAbstraction/WrappedInMemoryCache.tsx b/packages/client-react-streaming/src/DataTransportAbstraction/WrappedInMemoryCache.tsx index 34c00668..f11cc77e 100644 --- a/packages/client-react-streaming/src/DataTransportAbstraction/WrappedInMemoryCache.tsx +++ b/packages/client-react-streaming/src/DataTransportAbstraction/WrappedInMemoryCache.tsx @@ -1,35 +1,11 @@ -import type { - InMemoryCacheConfig, - Cache, - Reference, -} from "@apollo/client/index.js"; import { InMemoryCache as OrigInMemoryCache } from "@apollo/client/index.js"; -import { createBackpressuredCallback } from "./backpressuredCallback.js"; - -class InMemoryCacheSSRImpl extends OrigInMemoryCache { - protected writeQueue = createBackpressuredCallback(); - - constructor(config?: InMemoryCacheConfig) { - super(config); - } - - write(options: Cache.WriteOptions): Reference | undefined { - this.writeQueue.push(options); - return super.write(options); - } -} - -export type InMemoryCache = OrigInMemoryCache & { - writeQueue?: { - register?: ( - instance: ((options: Cache.WriteOptions) => void) | null - ) => void; - }; -}; - -export const InMemoryCache: { - new (config?: InMemoryCacheConfig): InMemoryCache; -} = - /*#__PURE__*/ process.env.REACT_ENV === "ssr" - ? InMemoryCacheSSRImpl - : OrigInMemoryCache; +/** + * We just subclass `InMemoryCache` here so that `WrappedApolloClient` + * can detect if it was initialized with an `InMemoryCache` instance that + * was also exported from this package. + * Right now, we don't have extra logic here, but we might have so again + * in the future. + * So we want to enforce this import path from the start to prevent future + * subtle bugs if people update the package and don't read the patch notes. + */ +export class InMemoryCache extends OrigInMemoryCache {} diff --git a/packages/client-react-streaming/src/DataTransportAbstraction/index.ts b/packages/client-react-streaming/src/DataTransportAbstraction/index.ts index 5dc26b3f..3c26b41e 100644 --- a/packages/client-react-streaming/src/DataTransportAbstraction/index.ts +++ b/packages/client-react-streaming/src/DataTransportAbstraction/index.ts @@ -4,5 +4,8 @@ export { ApolloClient } from "./WrappedApolloClient.js"; export { resetApolloSingletons } from "./testHelpers.js"; export { DataTransportContext } from "./DataTransportAbstraction.js"; -export type { DataTransportProviderImplementation } from "./DataTransportAbstraction.js"; +export type { + DataTransportProviderImplementation, + QueryEvent, +} from "./DataTransportAbstraction.js"; export { WrapApolloProvider } from "./WrapApolloProvider.js"; diff --git a/packages/client-react-streaming/src/ExperimentalManualDataTransport/ApolloRehydrateSymbols.tsx b/packages/client-react-streaming/src/ExperimentalManualDataTransport/ApolloRehydrateSymbols.tsx index 2a07192a..97108632 100644 --- a/packages/client-react-streaming/src/ExperimentalManualDataTransport/ApolloRehydrateSymbols.tsx +++ b/packages/client-react-streaming/src/ExperimentalManualDataTransport/ApolloRehydrateSymbols.tsx @@ -1,3 +1,5 @@ +// eslint-disable-next-line @typescript-eslint/ban-ts-comment +// @ts-ignore depending on the superjson version, this might not be right import type { SuperJSONResult } from "superjson"; import type { DataTransport } from "./dataTransport.js"; diff --git a/packages/client-react-streaming/src/ExperimentalManualDataTransport/ManualDataTransport.tsx b/packages/client-react-streaming/src/ExperimentalManualDataTransport/ManualDataTransport.tsx index 9924dab3..d5a020af 100644 --- a/packages/client-react-streaming/src/ExperimentalManualDataTransport/ManualDataTransport.tsx +++ b/packages/client-react-streaming/src/ExperimentalManualDataTransport/ManualDataTransport.tsx @@ -1,7 +1,6 @@ import React, { useCallback, useEffect, useId, useMemo, useRef } from "react"; import type { DataTransportProviderImplementation } from "@apollo/client-react-streaming"; import { DataTransportContext } from "@apollo/client-react-streaming"; -import type { Cache, WatchQueryOptions } from "@apollo/client/index.js"; import type { RehydrationCache, RehydrationContextValue } from "./types.js"; import type { HydrationContextOptions } from "./RehydrationContext.js"; import { buildApolloRehydrationContext } from "./RehydrationContext.js"; @@ -21,7 +20,6 @@ const buildManualDataTransportSSRImpl = ({ function ManualDataTransportSSRImpl({ extraScriptProps, children, - registerDispatchRequestData, registerDispatchRequestStarted, }) { const insertHtml = useInsertHtml(); @@ -34,11 +32,34 @@ const buildManualDataTransportSSRImpl = ({ }); } - registerDispatchRequestStarted!((options: WatchQueryOptions) => { - rehydrationContext.current!.incomingBackgroundQueries.push(options); - }); - registerDispatchRequestData!((options: Cache.WriteOptions) => { - rehydrationContext.current!.incomingResults.push(options); + registerDispatchRequestStarted!(({ options, id, observable }) => { + rehydrationContext.current!.incomingEvents.push({ + type: "started", + id, + options, + }); + observable.subscribe({ + next(value) { + rehydrationContext.current!.incomingEvents.push({ + type: "data", + id, + result: value, + }); + }, + error(error) { + rehydrationContext.current!.incomingEvents.push({ + type: "error", + id, + error, + }); + }, + complete() { + rehydrationContext.current!.incomingEvents.push({ + type: "complete", + id, + }); + }, + }); }); const contextValue = useMemo( @@ -63,19 +84,12 @@ const buildManualDataTransportBrowserImpl = (): DataTransportProviderImplementation => function ManualDataTransportBrowserImpl({ children, - onRequestStarted, - onRequestData, + onQueryEvent, rerunSimulatedQueries, }) { const hookRehydrationCache = useRef({}); - registerDataTransport({ - onRequestStarted: (options) => { - // we are not streaming anymore, so we should not simulate "server-side requests" - if (document.readyState === "complete") return; - onRequestStarted!(options); - }, - onRequestData: onRequestData!, + onQueryEvent: onQueryEvent!, onRehydrate(rehydrate) { Object.assign(hookRehydrationCache.current, rehydrate); }, diff --git a/packages/client-react-streaming/src/ExperimentalManualDataTransport/RehydrationContext.tsx b/packages/client-react-streaming/src/ExperimentalManualDataTransport/RehydrationContext.tsx index d098b1fe..325a14cc 100644 --- a/packages/client-react-streaming/src/ExperimentalManualDataTransport/RehydrationContext.tsx +++ b/packages/client-react-streaming/src/ExperimentalManualDataTransport/RehydrationContext.tsx @@ -37,28 +37,19 @@ export function buildApolloRehydrationContext({ currentlyInjected: false, transportValueData: getTransportObject(ensureInserted), transportedValues: {}, - incomingResults: getTransportArray(ensureInserted), - incomingBackgroundQueries: getTransportArray(ensureInserted), + incomingEvents: getTransportArray(ensureInserted), RehydrateOnClient() { rehydrationContext.currentlyInjected = false; if ( !Object.keys(rehydrationContext.transportValueData).length && - !Object.keys(rehydrationContext.incomingResults).length && - !Object.keys(rehydrationContext.incomingBackgroundQueries).length + !Object.keys(rehydrationContext.incomingEvents).length ) return <>; invariant.debug( "transporting data", rehydrationContext.transportValueData ); - invariant.debug( - "transporting results", - rehydrationContext.incomingResults - ); - invariant.debug( - "transporting incomingBackgroundQueries", - rehydrationContext.incomingBackgroundQueries - ); + invariant.debug("transporting events", rehydrationContext.incomingEvents); const __html = transportDataToJS({ rehydrate: Object.fromEntries( @@ -67,8 +58,7 @@ export function buildApolloRehydrationContext({ rehydrationContext.transportedValues[key] !== value ) ), - results: rehydrationContext.incomingResults, - backgroundQueries: rehydrationContext.incomingBackgroundQueries, + events: rehydrationContext.incomingEvents, }); Object.assign( rehydrationContext.transportedValues, @@ -76,9 +66,7 @@ export function buildApolloRehydrationContext({ ); rehydrationContext.transportValueData = getTransportObject(ensureInserted); - rehydrationContext.incomingResults = getTransportArray(ensureInserted); - rehydrationContext.incomingBackgroundQueries = - getTransportArray(ensureInserted); + rehydrationContext.incomingEvents = getTransportArray(ensureInserted); return (