Skip to content

Commit

Permalink
Rewrite responseIteratorNoAsyncIterator tests with ObservableStream
Browse files Browse the repository at this point in the history
  • Loading branch information
jerelmiller committed Nov 27, 2024
1 parent f42a6b7 commit cbfaca7
Showing 1 changed file with 156 additions and 158 deletions.
314 changes: 156 additions & 158 deletions src/link/http/__tests__/responseIteratorNoAsyncIterator.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import gql from "graphql-tag";
import { execute } from "../../core/execute";
import { HttpLink } from "../HttpLink";
import { itAsync, subscribeAndCount } from "../../../testing";
import type { Observable } from "zen-observable-ts";
import { TextEncoder, TextDecoder } from "util";
import { ReadableStream } from "web-streams-polyfill";
import { Readable } from "stream";
import { ObservableStream } from "../../../testing/internal";

// As of Jest 26 there is no way to mock/unmock a module that is used indirectly
// via a single test file.
Expand Down Expand Up @@ -34,28 +33,6 @@ const sampleDeferredQuery = gql`

const BOUNDARY = "gc0p4Jq0M2Yt08jU534c0p";

function matchesResults<T>(
resolve: () => void,
reject: (err: any) => void,
observable: Observable<T>,
results: Array<T>
) {
// TODO: adding a second observer to the observable will consume the
// observable. I want to test completion, but the subscribeAndCount API
// doesn’t have anything like that.
subscribeAndCount(reject, observable, (count, result) => {
// subscribeAndCount is 1-indexed for some terrible reason.
if (0 >= count || count > results.length) {
reject(new Error("Unexpected result"));
}

expect(result).toEqual(results[count - 1]);
if (count === results.length) {
resolve();
}
});
}

describe("multipart responses", () => {
let originalTextDecoder: any;
beforeAll(() => {
Expand Down Expand Up @@ -180,7 +157,7 @@ describe("multipart responses", () => {
},
];

itAsync("can handle whatwg stream bodies", (resolve, reject) => {
it("can handle whatwg stream bodies", async () => {
const stream = new ReadableStream({
async start(controller) {
const lines = bodyCustomBoundary.split("\r\n");
Expand All @@ -207,146 +184,167 @@ describe("multipart responses", () => {
});

const observable = execute(link, { query: sampleDeferredQuery });
matchesResults(resolve, reject, observable, results);
const observableStream = new ObservableStream(observable);

for (const result of results) {
await expect(observableStream).toEmitValue(result);
}

await expect(observableStream).toComplete();
});

itAsync(
"can handle whatwg stream bodies with arbitrary splits",
(resolve, reject) => {
const stream = new ReadableStream({
async start(controller) {
let chunks: Array<string> = [];
let chunkSize = 15;
for (let i = 0; i < bodyCustomBoundary.length; i += chunkSize) {
chunks.push(bodyCustomBoundary.slice(i, i + chunkSize));
}
it("can handle whatwg stream bodies with arbitrary splits", async () => {
const stream = new ReadableStream({
async start(controller) {
let chunks: Array<string> = [];
let chunkSize = 15;
for (let i = 0; i < bodyCustomBoundary.length; i += chunkSize) {
chunks.push(bodyCustomBoundary.slice(i, i + chunkSize));
}

try {
for (const chunk of chunks) {
controller.enqueue(chunk);
}
} finally {
controller.close();
try {
for (const chunk of chunks) {
controller.enqueue(chunk);
}
},
});

const fetch = jest.fn(async () => ({
status: 200,
body: stream,
headers: new Headers({
"content-type": `multipart/mixed; boundary=${BOUNDARY}`,
}),
}));

const link = new HttpLink({
fetch: fetch as any,
});

const observable = execute(link, { query: sampleDeferredQuery });
matchesResults(resolve, reject, observable, results);
} finally {
controller.close();
}
},
});

const fetch = jest.fn(async () => ({
status: 200,
body: stream,
headers: new Headers({
"content-type": `multipart/mixed; boundary=${BOUNDARY}`,
}),
}));

const link = new HttpLink({
fetch: fetch as any,
});

const observable = execute(link, { query: sampleDeferredQuery });
const observableStream = new ObservableStream(observable);

for (const result of results) {
await expect(observableStream).toEmitValue(result);
}
);

itAsync(
"can handle node stream bodies (strings) with default boundary",
(resolve, reject) => {
const stream = Readable.from(
bodyDefaultBoundary.split("\r\n").map((line) => line + "\r\n")
);

const fetch = jest.fn(async () => ({
status: 200,
body: stream,
// if no boundary is specified, default to -
headers: new Headers({
"content-type": `multipart/mixed`,
}),
}));
const link = new HttpLink({
fetch: fetch as any,
});

const observable = execute(link, { query: sampleDeferredQuery });
matchesResults(resolve, reject, observable, results);

await expect(observableStream).toComplete();
});

it("can handle node stream bodies (strings) with default boundary", async () => {
const stream = Readable.from(
bodyDefaultBoundary.split("\r\n").map((line) => line + "\r\n")
);

const fetch = jest.fn(async () => ({
status: 200,
body: stream,
// if no boundary is specified, default to -
headers: new Headers({
"content-type": `multipart/mixed`,
}),
}));
const link = new HttpLink({
fetch: fetch as any,
});

const observable = execute(link, { query: sampleDeferredQuery });
const observableStream = new ObservableStream(observable);

for (const result of results) {
await expect(observableStream).toEmitValue(result);
}
);

itAsync(
"can handle node stream bodies (strings) with arbitrary splits",
(resolve, reject) => {
let chunks: Array<string> = [];
let chunkSize = 15;
for (let i = 0; i < bodyCustomBoundary.length; i += chunkSize) {
chunks.push(bodyCustomBoundary.slice(i, i + chunkSize));
}
const stream = Readable.from(chunks);

const fetch = jest.fn(async () => ({
status: 200,
body: stream,
headers: new Headers({
"content-type": `multipart/mixed; boundary=${BOUNDARY}`,
}),
}));
const link = new HttpLink({
fetch: fetch as any,
});

const observable = execute(link, { query: sampleDeferredQuery });
matchesResults(resolve, reject, observable, results);

await expect(observableStream).toComplete();
});

it("can handle node stream bodies (strings) with arbitrary splits", async () => {
let chunks: Array<string> = [];
let chunkSize = 15;
for (let i = 0; i < bodyCustomBoundary.length; i += chunkSize) {
chunks.push(bodyCustomBoundary.slice(i, i + chunkSize));
}
);

itAsync(
"can handle node stream bodies (array buffers)",
(resolve, reject) => {
const stream = Readable.from(
bodyDefaultBoundary
.split("\r\n")
.map((line) => new TextEncoder().encode(line + "\r\n"))
);

const fetch = jest.fn(async () => ({
status: 200,
body: stream,
// if no boundary is specified, default to -
headers: new Headers({
"content-type": `multipart/mixed`,
}),
}));
const link = new HttpLink({
fetch: fetch as any,
});

const observable = execute(link, { query: sampleDeferredQuery });
matchesResults(resolve, reject, observable, results);
const stream = Readable.from(chunks);

const fetch = jest.fn(async () => ({
status: 200,
body: stream,
headers: new Headers({
"content-type": `multipart/mixed; boundary=${BOUNDARY}`,
}),
}));
const link = new HttpLink({
fetch: fetch as any,
});

const observable = execute(link, { query: sampleDeferredQuery });
const observableStream = new ObservableStream(observable);

for (const result of results) {
await expect(observableStream).toEmitValue(result);
}
);

itAsync(
"can handle node stream bodies (array buffers) with batched results",
(resolve, reject) => {
const stream = Readable.from(
bodyBatchedResults
.split("\r\n")
.map((line) => new TextEncoder().encode(line + "\r\n"))
);

const fetch = jest.fn(async () => ({
status: 200,
body: stream,
// if no boundary is specified, default to -
headers: new Headers({
"Content-Type": `multipart/mixed;boundary="graphql";deferSpec=20220824`,
}),
}));
const link = new HttpLink({
fetch: fetch as any,
});

const observable = execute(link, { query: sampleDeferredQuery });
matchesResults(resolve, reject, observable, batchedResults);

await expect(observableStream).toComplete();
});

it("can handle node stream bodies (array buffers)", async () => {
const stream = Readable.from(
bodyDefaultBoundary
.split("\r\n")
.map((line) => new TextEncoder().encode(line + "\r\n"))
);

const fetch = jest.fn(async () => ({
status: 200,
body: stream,
// if no boundary is specified, default to -
headers: new Headers({
"content-type": `multipart/mixed`,
}),
}));
const link = new HttpLink({
fetch: fetch as any,
});

const observable = execute(link, { query: sampleDeferredQuery });
const observableStream = new ObservableStream(observable);

for (const result of results) {
await expect(observableStream).toEmitValue(result);
}
);

await expect(observableStream).toComplete();
});

it("can handle node stream bodies (array buffers) with batched results", async () => {
const stream = Readable.from(
bodyBatchedResults
.split("\r\n")
.map((line) => new TextEncoder().encode(line + "\r\n"))
);

const fetch = jest.fn(async () => ({
status: 200,
body: stream,
// if no boundary is specified, default to -
headers: new Headers({
"Content-Type": `multipart/mixed;boundary="graphql";deferSpec=20220824`,
}),
}));
const link = new HttpLink({
fetch: fetch as any,
});

const observable = execute(link, { query: sampleDeferredQuery });
const observableStream = new ObservableStream(observable);

for (const result of batchedResults) {
await expect(observableStream).toEmitValue(result);
}

await expect(observableStream).toComplete();
});
});

0 comments on commit cbfaca7

Please sign in to comment.