diff --git a/test/common/modules/live_objects_helper.js b/test/common/modules/live_objects_helper.js index b2793804e..82c65b737 100644 --- a/test/common/modules/live_objects_helper.js +++ b/test/common/modules/live_objects_helper.js @@ -3,7 +3,9 @@ /** * LiveObjects helper to create pre-determined state tree on channels */ -define(['shared_helper'], function (Helper) { +define(['ably', 'shared_helper', 'live_objects'], function (Ably, Helper, LiveObjectsPlugin) { + const createPM = Ably.makeProtocolMessageFromDeserialized({ LiveObjectsPlugin }); + const ACTIONS = { MAP_CREATE: 0, MAP_SET: 1, @@ -18,6 +20,7 @@ define(['shared_helper'], function (Helper) { class LiveObjectsHelper { constructor(helper) { + this._helper = helper; this._rest = helper.AblyRest({ useBinaryProtocol: false }); } @@ -171,6 +174,97 @@ define(['shared_helper'], function (Helper) { return op; } + mapObject(opts) { + const { objectId, regionalTimeserial, entries } = opts; + const obj = { + object: { + objectId, + regionalTimeserial, + map: { entries }, + }, + }; + + return obj; + } + + counterObject(opts) { + const { objectId, regionalTimeserial, count } = opts; + const obj = { + object: { + objectId, + regionalTimeserial, + counter: { + created: true, + count, + }, + }, + }; + + return obj; + } + + stateOperationMessage(opts) { + const { channelName, serial, state } = opts; + + state?.forEach((x, i) => (x.serial = `${serial}:${i}`)); + + return { + action: 19, // STATE + channel: channelName, + channelSerial: serial, + state: state ?? [], + }; + } + + stateObjectMessage(opts) { + const { channelName, syncSerial, state } = opts; + + return { + action: 20, // STATE_SYNC + channel: channelName, + channelSerial: syncSerial, + state: state ?? [], + }; + } + + async processStateOperationMessageOnChannel(opts) { + const { channel, ...rest } = opts; + + this._helper.recordPrivateApi('call.channel.processMessage'); + this._helper.recordPrivateApi('call.makeProtocolMessageFromDeserialized'); + await channel.processMessage( + createPM( + this.stateOperationMessage({ + ...rest, + channelName: channel.name, + }), + ), + ); + } + + async processStateObjectMessageOnChannel(opts) { + const { channel, ...rest } = opts; + + this._helper.recordPrivateApi('call.channel.processMessage'); + this._helper.recordPrivateApi('call.makeProtocolMessageFromDeserialized'); + await channel.processMessage( + createPM( + this.stateObjectMessage({ + ...rest, + channelName: channel.name, + }), + ), + ); + } + + fakeMapObjectId() { + return `map:${Helper.randomString()}`; + } + + fakeCounterObjectId() { + return `counter:${Helper.randomString()}`; + } + async stateRequest(channelName, opBody) { if (Array.isArray(opBody)) { throw new Error(`Only single object state requests are supported`); diff --git a/test/realtime/live_objects.test.js b/test/realtime/live_objects.test.js index c0da311f3..38ce39d43 100644 --- a/test/realtime/live_objects.test.js +++ b/test/realtime/live_objects.test.js @@ -55,6 +55,7 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], /** @nospec */ it(`doesn't break when it receives a STATE ProtocolMessage`, async function () { const helper = this.test.helper; + const liveObjectsHelper = new LiveObjectsHelper(helper); const testClient = helper.AblyRealtime(); await helper.monitorConnectionThenCloseAndFinish(async () => { @@ -67,25 +68,13 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], await helper.monitorConnectionThenCloseAndFinish(async () => { // inject STATE message that should be ignored and not break anything without LiveObjects plugin - helper.recordPrivateApi('call.channel.processMessage'); - helper.recordPrivateApi('call.makeProtocolMessageFromDeserialized'); - await testChannel.processMessage( - createPM({ - action: 19, - channel: 'channel', - channelSerial: 'serial:', - state: [ - { - operation: { - action: 1, - objectId: 'root', - mapOp: { key: 'stringKey', data: { value: 'stringValue' } }, - }, - serial: 'a@0-0', - }, - ], - }), - ); + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel: testChannel, + serial: '@0-0', + state: [ + liveObjectsHelper.mapSetOp({ objectId: 'root', key: 'stringKey', data: { value: 'stringValue' } }), + ], + }); const publishChannel = publishClient.channels.get('channel'); await publishChannel.publish(null, 'test'); @@ -99,6 +88,7 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], /** @nospec */ it(`doesn't break when it receives a STATE_SYNC ProtocolMessage`, async function () { const helper = this.test.helper; + const liveObjectsHelper = new LiveObjectsHelper(helper); const testClient = helper.AblyRealtime(); await helper.monitorConnectionThenCloseAndFinish(async () => { @@ -111,24 +101,11 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], await helper.monitorConnectionThenCloseAndFinish(async () => { // inject STATE_SYNC message that should be ignored and not break anything without LiveObjects plugin - helper.recordPrivateApi('call.channel.processMessage'); - helper.recordPrivateApi('call.makeProtocolMessageFromDeserialized'); - await testChannel.processMessage( - createPM({ - action: 20, - channel: 'channel', - channelSerial: 'serial:', - state: [ - { - object: { - objectId: 'root', - regionalTimeserial: 'a@0-0', - map: {}, - }, - }, - ], - }), - ); + await liveObjectsHelper.processStateObjectMessageOnChannel({ + channel: testChannel, + syncSerial: 'serial:', + state: [liveObjectsHelper.mapObject({ objectId: 'root', regionalTimeserial: '@0-0' })], + }); const publishChannel = publishClient.channels.get('channel'); await publishChannel.publish(null, 'test'); @@ -255,6 +232,7 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], /** @nospec */ it('getRoot() waits for subsequent STATE_SYNC to finish before resolving', async function () { const helper = this.test.helper; + const liveObjectsHelper = new LiveObjectsHelper(helper); const client = RealtimeWithLiveObjects(helper); await helper.monitorConnectionThenCloseAndFinish(async () => { @@ -266,23 +244,17 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], await liveObjects.getRoot(); // inject STATE_SYNC message to emulate start of a new sequence - helper.recordPrivateApi('call.channel.processMessage'); - helper.recordPrivateApi('call.makeProtocolMessageFromDeserialized'); - await channel.processMessage( - createPM({ - action: 20, - channel: 'channel', - // have cursor so client awaits for additional STATE_SYNC messages - channelSerial: 'serial:cursor', - state: [], - }), - ); + await liveObjectsHelper.processStateObjectMessageOnChannel({ + channel, + // have cursor so client awaits for additional STATE_SYNC messages + syncSerial: 'serial:cursor', + }); let getRootResolved = false; - let newRoot; + let root; liveObjects.getRoot().then((value) => { getRootResolved = true; - newRoot = value; + root = value; }); // wait for next tick to check that getRoot() promise handler didn't proc @@ -291,42 +263,26 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], expect(getRootResolved, 'Check getRoot() is not resolved while STATE_SYNC is in progress').to.be.false; - // inject next STATE_SYNC message - helper.recordPrivateApi('call.channel.processMessage'); - helper.recordPrivateApi('call.makeProtocolMessageFromDeserialized'); - await channel.processMessage( - createPM({ - action: 20, - channel: 'channel', - // no cursor to indicate the end of STATE_SYNC messages - channelSerial: 'serial:', - state: [ - { - object: { - objectId: 'root', - regionalTimeserial: 'a@0-0', - map: { - entries: { - key: { - timeserial: 'a@0-0', - data: { - value: 1, - }, - }, - }, - }, - }, - }, - ], - }), - ); + // inject final STATE_SYNC message + await liveObjectsHelper.processStateObjectMessageOnChannel({ + channel, + // no cursor to indicate the end of STATE_SYNC messages + syncSerial: 'serial:', + state: [ + liveObjectsHelper.mapObject({ + objectId: 'root', + regionalTimeserial: '@0-0', + entries: { key: { timeserial: '@0-0', data: { value: 1 } } }, + }), + ], + }); // wait for next tick for getRoot() handler to process helper.recordPrivateApi('call.Platform.nextTick'); await new Promise((res) => nextTick(res)); expect(getRootResolved, 'Check getRoot() is resolved when STATE_SYNC sequence has ended').to.be.true; - expect(newRoot.get('key')).to.equal(1, 'Check new root after STATE_SYNC sequence has expected key'); + expect(root.get('key')).to.equal(1, 'Check new root after STATE_SYNC sequence has expected key'); }, client); });