From cbfaca7339e97d512b28b5a9f889fbcf29cc7922 Mon Sep 17 00:00:00 2001 From: Jerel Miller Date: Tue, 26 Nov 2024 21:44:45 -0700 Subject: [PATCH] Rewrite responseIteratorNoAsyncIterator tests with ObservableStream --- .../responseIteratorNoAsyncIterator.ts | 314 +++++++++--------- 1 file changed, 156 insertions(+), 158 deletions(-) diff --git a/src/link/http/__tests__/responseIteratorNoAsyncIterator.ts b/src/link/http/__tests__/responseIteratorNoAsyncIterator.ts index 312823cf099..4b3c1268753 100644 --- a/src/link/http/__tests__/responseIteratorNoAsyncIterator.ts +++ b/src/link/http/__tests__/responseIteratorNoAsyncIterator.ts @@ -1,11 +1,10 @@ import gql from "graphql-tag"; import { execute } from "../../core/execute"; import { HttpLink } from "../HttpLink"; -import { itAsync, subscribeAndCount } from "../../../testing"; -import type { Observable } from "zen-observable-ts"; import { TextEncoder, TextDecoder } from "util"; import { ReadableStream } from "web-streams-polyfill"; import { Readable } from "stream"; +import { ObservableStream } from "../../../testing/internal"; // As of Jest 26 there is no way to mock/unmock a module that is used indirectly // via a single test file. @@ -34,28 +33,6 @@ const sampleDeferredQuery = gql` const BOUNDARY = "gc0p4Jq0M2Yt08jU534c0p"; -function matchesResults( - resolve: () => void, - reject: (err: any) => void, - observable: Observable, - results: Array -) { - // TODO: adding a second observer to the observable will consume the - // observable. I want to test completion, but the subscribeAndCount API - // doesn’t have anything like that. - subscribeAndCount(reject, observable, (count, result) => { - // subscribeAndCount is 1-indexed for some terrible reason. - if (0 >= count || count > results.length) { - reject(new Error("Unexpected result")); - } - - expect(result).toEqual(results[count - 1]); - if (count === results.length) { - resolve(); - } - }); -} - describe("multipart responses", () => { let originalTextDecoder: any; beforeAll(() => { @@ -180,7 +157,7 @@ describe("multipart responses", () => { }, ]; - itAsync("can handle whatwg stream bodies", (resolve, reject) => { + it("can handle whatwg stream bodies", async () => { const stream = new ReadableStream({ async start(controller) { const lines = bodyCustomBoundary.split("\r\n"); @@ -207,146 +184,167 @@ describe("multipart responses", () => { }); const observable = execute(link, { query: sampleDeferredQuery }); - matchesResults(resolve, reject, observable, results); + const observableStream = new ObservableStream(observable); + + for (const result of results) { + await expect(observableStream).toEmitValue(result); + } + + await expect(observableStream).toComplete(); }); - itAsync( - "can handle whatwg stream bodies with arbitrary splits", - (resolve, reject) => { - const stream = new ReadableStream({ - async start(controller) { - let chunks: Array = []; - let chunkSize = 15; - for (let i = 0; i < bodyCustomBoundary.length; i += chunkSize) { - chunks.push(bodyCustomBoundary.slice(i, i + chunkSize)); - } + it("can handle whatwg stream bodies with arbitrary splits", async () => { + const stream = new ReadableStream({ + async start(controller) { + let chunks: Array = []; + let chunkSize = 15; + for (let i = 0; i < bodyCustomBoundary.length; i += chunkSize) { + chunks.push(bodyCustomBoundary.slice(i, i + chunkSize)); + } - try { - for (const chunk of chunks) { - controller.enqueue(chunk); - } - } finally { - controller.close(); + try { + for (const chunk of chunks) { + controller.enqueue(chunk); } - }, - }); - - const fetch = jest.fn(async () => ({ - status: 200, - body: stream, - headers: new Headers({ - "content-type": `multipart/mixed; boundary=${BOUNDARY}`, - }), - })); - - const link = new HttpLink({ - fetch: fetch as any, - }); - - const observable = execute(link, { query: sampleDeferredQuery }); - matchesResults(resolve, reject, observable, results); + } finally { + controller.close(); + } + }, + }); + + const fetch = jest.fn(async () => ({ + status: 200, + body: stream, + headers: new Headers({ + "content-type": `multipart/mixed; boundary=${BOUNDARY}`, + }), + })); + + const link = new HttpLink({ + fetch: fetch as any, + }); + + const observable = execute(link, { query: sampleDeferredQuery }); + const observableStream = new ObservableStream(observable); + + for (const result of results) { + await expect(observableStream).toEmitValue(result); } - ); - - itAsync( - "can handle node stream bodies (strings) with default boundary", - (resolve, reject) => { - const stream = Readable.from( - bodyDefaultBoundary.split("\r\n").map((line) => line + "\r\n") - ); - - const fetch = jest.fn(async () => ({ - status: 200, - body: stream, - // if no boundary is specified, default to - - headers: new Headers({ - "content-type": `multipart/mixed`, - }), - })); - const link = new HttpLink({ - fetch: fetch as any, - }); - - const observable = execute(link, { query: sampleDeferredQuery }); - matchesResults(resolve, reject, observable, results); + + await expect(observableStream).toComplete(); + }); + + it("can handle node stream bodies (strings) with default boundary", async () => { + const stream = Readable.from( + bodyDefaultBoundary.split("\r\n").map((line) => line + "\r\n") + ); + + const fetch = jest.fn(async () => ({ + status: 200, + body: stream, + // if no boundary is specified, default to - + headers: new Headers({ + "content-type": `multipart/mixed`, + }), + })); + const link = new HttpLink({ + fetch: fetch as any, + }); + + const observable = execute(link, { query: sampleDeferredQuery }); + const observableStream = new ObservableStream(observable); + + for (const result of results) { + await expect(observableStream).toEmitValue(result); } - ); - - itAsync( - "can handle node stream bodies (strings) with arbitrary splits", - (resolve, reject) => { - let chunks: Array = []; - let chunkSize = 15; - for (let i = 0; i < bodyCustomBoundary.length; i += chunkSize) { - chunks.push(bodyCustomBoundary.slice(i, i + chunkSize)); - } - const stream = Readable.from(chunks); - - const fetch = jest.fn(async () => ({ - status: 200, - body: stream, - headers: new Headers({ - "content-type": `multipart/mixed; boundary=${BOUNDARY}`, - }), - })); - const link = new HttpLink({ - fetch: fetch as any, - }); - - const observable = execute(link, { query: sampleDeferredQuery }); - matchesResults(resolve, reject, observable, results); + + await expect(observableStream).toComplete(); + }); + + it("can handle node stream bodies (strings) with arbitrary splits", async () => { + let chunks: Array = []; + let chunkSize = 15; + for (let i = 0; i < bodyCustomBoundary.length; i += chunkSize) { + chunks.push(bodyCustomBoundary.slice(i, i + chunkSize)); } - ); - - itAsync( - "can handle node stream bodies (array buffers)", - (resolve, reject) => { - const stream = Readable.from( - bodyDefaultBoundary - .split("\r\n") - .map((line) => new TextEncoder().encode(line + "\r\n")) - ); - - const fetch = jest.fn(async () => ({ - status: 200, - body: stream, - // if no boundary is specified, default to - - headers: new Headers({ - "content-type": `multipart/mixed`, - }), - })); - const link = new HttpLink({ - fetch: fetch as any, - }); - - const observable = execute(link, { query: sampleDeferredQuery }); - matchesResults(resolve, reject, observable, results); + const stream = Readable.from(chunks); + + const fetch = jest.fn(async () => ({ + status: 200, + body: stream, + headers: new Headers({ + "content-type": `multipart/mixed; boundary=${BOUNDARY}`, + }), + })); + const link = new HttpLink({ + fetch: fetch as any, + }); + + const observable = execute(link, { query: sampleDeferredQuery }); + const observableStream = new ObservableStream(observable); + + for (const result of results) { + await expect(observableStream).toEmitValue(result); } - ); - - itAsync( - "can handle node stream bodies (array buffers) with batched results", - (resolve, reject) => { - const stream = Readable.from( - bodyBatchedResults - .split("\r\n") - .map((line) => new TextEncoder().encode(line + "\r\n")) - ); - - const fetch = jest.fn(async () => ({ - status: 200, - body: stream, - // if no boundary is specified, default to - - headers: new Headers({ - "Content-Type": `multipart/mixed;boundary="graphql";deferSpec=20220824`, - }), - })); - const link = new HttpLink({ - fetch: fetch as any, - }); - - const observable = execute(link, { query: sampleDeferredQuery }); - matchesResults(resolve, reject, observable, batchedResults); + + await expect(observableStream).toComplete(); + }); + + it("can handle node stream bodies (array buffers)", async () => { + const stream = Readable.from( + bodyDefaultBoundary + .split("\r\n") + .map((line) => new TextEncoder().encode(line + "\r\n")) + ); + + const fetch = jest.fn(async () => ({ + status: 200, + body: stream, + // if no boundary is specified, default to - + headers: new Headers({ + "content-type": `multipart/mixed`, + }), + })); + const link = new HttpLink({ + fetch: fetch as any, + }); + + const observable = execute(link, { query: sampleDeferredQuery }); + const observableStream = new ObservableStream(observable); + + for (const result of results) { + await expect(observableStream).toEmitValue(result); } - ); + + await expect(observableStream).toComplete(); + }); + + it("can handle node stream bodies (array buffers) with batched results", async () => { + const stream = Readable.from( + bodyBatchedResults + .split("\r\n") + .map((line) => new TextEncoder().encode(line + "\r\n")) + ); + + const fetch = jest.fn(async () => ({ + status: 200, + body: stream, + // if no boundary is specified, default to - + headers: new Headers({ + "Content-Type": `multipart/mixed;boundary="graphql";deferSpec=20220824`, + }), + })); + const link = new HttpLink({ + fetch: fetch as any, + }); + + const observable = execute(link, { query: sampleDeferredQuery }); + const observableStream = new ObservableStream(observable); + + for (const result of batchedResults) { + await expect(observableStream).toEmitValue(result); + } + + await expect(observableStream).toComplete(); + }); });