Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
tanderson-ld committed Dec 9, 2024
1 parent 842e5f4 commit 6e8a17b
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 20 deletions.
121 changes: 121 additions & 0 deletions packages/shared/common/__tests__/internal/fdv2/PayloadReader.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import { EventListener, EventName } from '../../../src/api';
import { EventStream, Payload, PayloadReader } from '../../../src/internal/fdv2/payloadReader';

class MockEventStreamm implements EventStream {
private _listeners: {
[event: EventName]: EventListener;
} = {};

addEventListener(eventName: EventName, listener: EventListener): void {
this._listeners[eventName] = listener;
}

simulateEvent(eventName: EventName, event: { data?: string }) {
this._listeners[eventName](event);
}
}

it('it sets basis to true when intent code is xfer-full', () => {
const mockStream = new MockEventStreamm();
const receivedPayloads: Payload[] = [];
const readerUnderTest = new PayloadReader(mockStream, {
mockKind: (it) => it, // obj processor that just returns the same obj
});
readerUnderTest.addPayloadListener((it) => {
receivedPayloads.push(it);
});

mockStream.simulateEvent('server-intent', {
data: '{"data": {"payloads": [{"intentCode": "xfer-full", "id": "mockId"}]}}',
});
mockStream.simulateEvent('payload-transferred', {
data: '{"data": {"state": "mockState", "version": 1}}',
});
expect(receivedPayloads.length).toEqual(1);
expect(receivedPayloads[0].id).toEqual('mockId');
expect(receivedPayloads[0].state).toEqual('mockState');
expect(receivedPayloads[0].basis).toEqual(true);
});

it('it sets basis to false when intent code is xfer-changes', () => {
const mockStream = new MockEventStreamm();
const receivedPayloads: Payload[] = [];
const readerUnderTest = new PayloadReader(mockStream, {
mockKind: (it) => it, // obj processor that just returns the same obj
});
readerUnderTest.addPayloadListener((it) => {
receivedPayloads.push(it);
});

mockStream.simulateEvent('server-intent', {
data: '{"data": {"payloads": [{"intentCode": "xfer-changes", "id": "mockId"}]}}',
});
mockStream.simulateEvent('payload-transferred', {
data: '{"data": {"state": "mockState", "version": 1}}',
});
expect(receivedPayloads.length).toEqual(1);
expect(receivedPayloads[0].id).toEqual('mockId');
expect(receivedPayloads[0].state).toEqual('mockState');
expect(receivedPayloads[0].basis).toEqual(false);
});

it('it includes multiple types of updates in payload', () => {
const mockStream = new MockEventStreamm();
const receivedPayloads: Payload[] = [];
const readerUnderTest = new PayloadReader(mockStream, {
mockKind: (it) => it, // obj processor that just returns the same obj
});
readerUnderTest.addPayloadListener((it) => {
receivedPayloads.push(it);
});

mockStream.simulateEvent('server-intent', {
data: '{"data": {"payloads": [{"intentCode": "xfer-full", "id": "mockId"}]}}',
});
mockStream.simulateEvent('put-object', {
data: '{"data": {"kind": "mockKind", "key": "flagA", "version": 123, "object": {"objectFieldA": "objectValueA"}}}',
});
mockStream.simulateEvent('delete-object', {
data: '{"data": {"kind": "mockKind", "key": "flagB", "version": 123, "object": {"objectFieldB": "objectValueB"}}}',
});
mockStream.simulateEvent('put-object', {
data: '{"data": {"kind": "mockKind", "key": "flagC", "version": 123, "object": {"objectFieldC": "objectValueC"}}}',
});
mockStream.simulateEvent('payload-transferred', {
data: '{"data": {"state": "mockState", "version": 1}}',
});
expect(receivedPayloads.length).toEqual(1);
expect(receivedPayloads[0].id).toEqual('mockId');
expect(receivedPayloads[0].state).toEqual('mockState');
expect(receivedPayloads[0].basis).toEqual(true);
expect(receivedPayloads[0].updates.length).toEqual(3);
expect(receivedPayloads[0].updates[0].object).toEqual({ objectFieldA: 'objectValueA' });
expect(receivedPayloads[0].updates[0].deleted).toEqual(false); // TODO: resume at deciding if deleted should be optional (it is at the moment and causing this to fail)
expect(receivedPayloads[0].updates[1].object).toEqual({ objectFieldB: 'objectValueB' });
expect(receivedPayloads[0].updates[1].deleted).toEqual(true);
expect(receivedPayloads[0].updates[2].object).toEqual({ objectFieldC: 'objectValueC' });
});

// it('it does not include messages thats are not between server-intent and payloader-transferred', () => {

// });

// it('logs prescribed message when goodbye event is encountered', () => {

// });

// it('logs prescribed message when error event is encountered', () => {

// });

// it('discards partially transferred data when an error is encountered', () => {

// });

// it('silently ignores unrecognized kinds', () => {

// });

// it('ignores additional payloads beyond the first payload in the server-intent message', () => {

// });
40 changes: 20 additions & 20 deletions packages/shared/common/src/internal/fdv2/payloadReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ export class PayloadReader {
tempUpdates: Update[] = [];

constructor(
eventSource: EventStream,
eventStream: EventStream,
private readonly _jsonObjConverters: JsonObjConverters,
private readonly _errorHandler?: (errorKind: DataSourceErrorKind, message: string) => void,
private readonly _logger?: LDLogger,
) {
this._attachHandler(eventSource, 'server-intent', this._processServerIntent);
this._attachHandler(eventSource, 'put-object', this._processPutObject);
this._attachHandler(eventSource, 'delete-object', this._processDeleteObject);
this._attachHandler(eventSource, 'payload-transferred', this._processPayloadTransferred);
this._attachHandler(eventStream, 'server-intent', this._processServerIntent);
this._attachHandler(eventStream, 'put-object', this._processPutObject);
this._attachHandler(eventStream, 'delete-object', this._processDeleteObject);
this._attachHandler(eventStream, 'payload-transferred', this._processPayloadTransferred);
}

addPayloadListener(listener: PayloadListener) {
Expand Down Expand Up @@ -79,8 +79,8 @@ export class PayloadReader {
});
}

private _convertJsonObj(jsonObj: any): any {
return this._jsonObjConverters[jsonObj.kind]?.(jsonObj);
private _processObj(kind: string, jsonObj: any): any {
return this._jsonObjConverters[kind]?.(jsonObj);
}

// TODO: add valid state/reset handling if an invalid message is received part way through processing and to avoid starting prcessing put/deletes before server intent is received
Expand Down Expand Up @@ -111,53 +111,53 @@ export class PayloadReader {
this.tempId = payload?.id;
};

private _processPutObject = (jsonObj: any) => {
private _processPutObject = (event?: { data?: DataObject }) => {
// if the following properties haven't been provided by now, we're in an invalid state
if (!jsonObj.kind || !jsonObj.key || !jsonObj.version || !jsonObj.object) {
if (!event?.data?.kind || !event?.data?.key || !event?.data?.version || !event?.data?.object) {
this._resetState();
return;
}

const obj = this._convertJsonObj(jsonObj);
const obj = this._processObj(event.data.kind, event.data.object);
if (!obj) {
// ignore unrecognized kinds
return;
}

this.tempUpdates.push({
kind: jsonObj.kind,
key: jsonObj.key,
version: jsonObj.version,
kind: event.data.kind,
key: event.data.key,
version: event.data.version,
object: obj,
// intentionally omit deleted for this put
});
};

private _processDeleteObject = (jsonObj: any) => {
private _processDeleteObject = (event?: { data?: DataObject }) => {
// if the following properties haven't been provided by now, we're in an invalid state
if (!jsonObj.kind || !jsonObj.key || !jsonObj.version || !jsonObj.object) {
if (!event?.data?.kind || !event?.data?.key || !event?.data?.version || !event?.data?.object) {
this._resetState();
return;
}

const obj = this._convertJsonObj(jsonObj);
const obj = this._processObj(event.data.kind, event.data.object);
if (!obj) {
// ignore unrecognized kinds
return;
}

this.tempUpdates.push({
kind: jsonObj.kind,
key: jsonObj.key,
version: jsonObj.version,
kind: event.data.kind,
key: event.data.key,
version: event.data.version,
object: obj,
deleted: true,
});
};

private _processPayloadTransferred = (event?: { data?: PayloadTransferred }) => {
// if the following properties haven't been provided by now, we're in an invalid state
if (!event?.data?.state || !event.data.version || !this.tempBasis) {
if (!event?.data?.state || !event.data.version || this.tempBasis === undefined) {
this._resetState();
return;
}
Expand Down

0 comments on commit 6e8a17b

Please sign in to comment.