Skip to content

Commit

Permalink
SSE subscription tests
Browse files Browse the repository at this point in the history
  • Loading branch information
bastianjoel committed Mar 4, 2024
1 parent 686d754 commit a9ac43e
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 16 deletions.
102 changes: 89 additions & 13 deletions client/src/app/worker/http/http-subscription-sse.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import fetchMock, { MockRequest } from 'fetch-mock';
import { ErrorDescription, ErrorType } from 'src/app/gateways/http-stream/stream-utils';
import { HttpMethod } from 'src/app/infrastructure/definitions/http';

import { HttpSubscriptionEndpoint } from './http-subscription';
Expand Down Expand Up @@ -44,9 +45,16 @@ function getValidStream(req: MockRequest, interval: number, resolveAfter = -1) {
return new Response(stream);
}

fdescribe(`http subscription polling`, () => {
describe(`http subscription polling`, () => {
beforeEach(() => {
fetchMock.mock(`end:/does-not-resolve`, (_, opts) => getValidStream(opts, 100));
fetchMock.mock(`end:/error400-expected-format`, {
status: 400,
headers: { 'Content-Type': `application/json` },
body: JSON.stringify({
error: { type: `ClientError`, msg: `Example error` }
})
});
});

afterEach(() => fetchMock.reset());
Expand All @@ -67,19 +75,87 @@ fdescribe(`http subscription polling`, () => {
await subscr.stop();
});

xit(`receives error in onError`, async () => {});

xit(`receives error in onData`, async () => {});

xdescribe(`stopping`, () => {
it(`stop while waiting for data`, async () => {});

it(`stop after data received`, async () => {});

it(`instant stop`, async () => {});
it(`receives error in onError`, async () => {
let dataResolver: CallableFunction;
let errorResolver: CallableFunction;
const receivedData = new Promise(resolve => (dataResolver = resolve));
const receivedError = new Promise(resolve => (errorResolver = resolve));
const subscr = getHttpSubscriptionSSEInstance(
`/error400-expected-format`,
(d: any) => dataResolver(d),
(d: any) => errorResolver(d)
);
subscr.start();
await expectAsync(receivedError).toBeResolved();
expectAsync(receivedData).toBePending();
expect((<ErrorDescription>await receivedError)?.type).toEqual(ErrorType.CLIENT);
await subscr.stop();
});

it(`stops on server error`, async () => {});
it(`receives error in onData`, async () => {
let resolver: CallableFunction;
const receivedData = new Promise(resolve => (resolver = resolve));
const subscr = getHttpSubscriptionSSEInstance(`/error400-expected-format`, (d: any) => resolver(d));
subscr.start();
await expectAsync(receivedData).toBeResolved();
expect((<ErrorDescription>await receivedData)?.type).toEqual(ErrorType.CLIENT);
await subscr.stop();
});

it(`stops on client error`, async () => {});
describe(`stopping`, () => {
it(`stop after resolve`, async () => {
fetchMock.mock(`end:/resolves`, (_, opts) => getValidStream(opts, 100, 2));

let resolver: CallableFunction;
const receivedData = new Promise(resolve => (resolver = resolve));
const subscr = getHttpSubscriptionSSEInstance(`/resolves`, () => resolver());
const start = subscr.start();
await receivedData;
await expectAsync(start).toBeResolved();
expect(subscr.active).toBeFalsy();
});

it(`stop after data received`, async () => {
let resolver: CallableFunction;
const receivedData = new Promise(resolve => (resolver = resolve));
const subscr = getHttpSubscriptionSSEInstance(`/does-not-resolve`, () => resolver());
const start = subscr.start();
await receivedData;
await subscr.stop();
return expectAsync(start).toBeResolved();
});

it(`instant stop`, async () => {
const subscr = getHttpSubscriptionSSEInstance(`/does-not-resolve`);
const start = subscr.start();
await subscr.stop();
return expectAsync(start).toBeResolved();
});

it(`stops on server error`, async () => {
fetchMock.mock(`end:/error502`, {
status: 502,
body: `Bad gateway`
});
let resolver: CallableFunction;
const receivedData = new Promise(resolve => (resolver = resolve));
const subscr = getHttpSubscriptionSSEInstance(`/error502`, (d: any) => resolver(d));
subscr.start();
const data = await receivedData;
expect((<ErrorDescription>data)?.type).toEqual(ErrorType.SERVER);
expect(subscr.active).toBeFalsy();
await subscr.stop();
});

it(`stops on client error`, async () => {
let resolver: CallableFunction;
const receivedData = new Promise(resolve => (resolver = resolve));
const subscr = getHttpSubscriptionSSEInstance(`/error400-expected-format`, (d: any) => resolver(d));
subscr.start();
const data = await receivedData;
expect((<ErrorDescription>data)?.type).toEqual(ErrorType.CLIENT);
expect(subscr.active).toBeFalsy();
await subscr.stop();
});
});
});
22 changes: 19 additions & 3 deletions client/src/app/worker/http/http-subscription-sse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,25 +70,41 @@ export class HttpSubscriptionSSE extends HttpSubscription {
}
}

let data = ``;
let data: string | null = null;
if (next) {
data = new TextDecoder().decode(next);
this.callbacks.onData(data);
}

if (!response.ok) {
// throw { response, content: next };
const error = this.parseErrorFromResponse(response, await this.parseNonOkResponse(data));
if (this.callbacks.onError) {
this.callbacks.onError(error);
} else {
this.callbacks.onData(error);
}
this._active = false;
} else if (data) {
this.callbacks.onData(data);
}
} catch (e) {
if (e.name !== `AbortError`) {
throw e;
}
}

this.abortCtrl = undefined;
if (this.abortResolver) {
this.abortResolver();
}

this._active = false;
}

private async parseNonOkResponse(data: string): Promise<unknown> {
try {
return JSON.parse(data);
} catch (_) {
return data;
}
}
}

0 comments on commit a9ac43e

Please sign in to comment.