From b09f2f957b718bc42860023f7ab8823ff97337f8 Mon Sep 17 00:00:00 2001 From: Andrew Bulat Date: Tue, 5 Nov 2024 06:24:00 +0000 Subject: [PATCH] Add LiveObjects subscriptions tests --- test/realtime/live_objects.test.js | 521 ++++++++++++++++++++++++++++- 1 file changed, 518 insertions(+), 3 deletions(-) diff --git a/test/realtime/live_objects.test.js b/test/realtime/live_objects.test.js index a1a9ffaef..0adaefbed 100644 --- a/test/realtime/live_objects.test.js +++ b/test/realtime/live_objects.test.js @@ -562,7 +562,7 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], ), ).to.not.exist; - // create map with references. need to created referenced objects first to obtain their object ids + // create map with references. need to create referenced objects first to obtain their object ids const { objectId: referencedMapObjectId } = await liveObjectsHelper.stateRequest( channelName, liveObjectsHelper.mapCreateOp({ entries: { stringKey: { data: { value: 'stringValue' } } } }), @@ -905,7 +905,7 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], }); } - const operationsDuringSyncSequence = [ + const applyOperationsDuringSyncScenarios = [ { description: 'state operation messages are buffered during STATE_SYNC sequence', action: async (ctx) => { @@ -1206,7 +1206,7 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], }, ]; - for (const scenario of operationsDuringSyncSequence) { + for (const scenario of applyOperationsDuringSyncScenarios) { if (scenario.skip === true) { continue; } @@ -1231,6 +1231,521 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], }, client); }); } + + const subscriptionCallbacksScenarios = [ + { + description: 'can subscribe to the incoming COUNTER_INC operation on a LiveCounter', + action: async (ctx) => { + const { root, liveObjectsHelper, channelName, sampleCounterKey, sampleCounterObjectId } = ctx; + + const counter = root.get(sampleCounterKey); + const subscriptionPromise = new Promise((resolve, reject) => + counter.subscribe((update) => { + try { + expect(update).to.deep.equal( + { update: { inc: 1 } }, + 'Check counter subscription callback is called with an expected update object for COUNTER_INC operation', + ); + resolve(); + } catch (error) { + reject(error); + } + }), + ); + + await liveObjectsHelper.stateRequest( + channelName, + liveObjectsHelper.counterIncOp({ + objectId: sampleCounterObjectId, + amount: 1, + }), + ); + + await subscriptionPromise; + }, + }, + + { + description: 'can subscribe to the incoming MAP_SET operation on a LiveMap', + action: async (ctx) => { + const { root, liveObjectsHelper, channelName, sampleMapKey, sampleMapObjectId } = ctx; + + const map = root.get(sampleMapKey); + const subscriptionPromise = new Promise((resolve, reject) => + map.subscribe((update) => { + try { + expect(update).to.deep.equal( + { update: { stringKey: 'updated' } }, + 'Check map subscription callback is called with an expected update object for MAP_SET operation', + ); + resolve(); + } catch (error) { + reject(error); + } + }), + ); + + await liveObjectsHelper.stateRequest( + channelName, + liveObjectsHelper.mapSetOp({ + objectId: sampleMapObjectId, + key: 'stringKey', + data: { value: 'stringValue' }, + }), + ); + + await subscriptionPromise; + }, + }, + + { + description: 'can subscribe to the incoming MAP_REMOVE operation on a LiveMap', + action: async (ctx) => { + const { root, liveObjectsHelper, channelName, sampleMapKey, sampleMapObjectId } = ctx; + + const map = root.get(sampleMapKey); + const subscriptionPromise = new Promise((resolve, reject) => + map.subscribe((update) => { + try { + expect(update).to.deep.equal( + { update: { stringKey: 'deleted' } }, + 'Check map subscription callback is called with an expected update object for MAP_REMOVE operation', + ); + resolve(); + } catch (error) { + reject(error); + } + }), + ); + + await liveObjectsHelper.stateRequest( + channelName, + liveObjectsHelper.mapRemoveOp({ + objectId: sampleMapObjectId, + key: 'stringKey', + }), + ); + + await subscriptionPromise; + }, + }, + + { + description: 'can subscribe to multiple incoming operations on a LiveCounter', + action: async (ctx) => { + const { root, liveObjectsHelper, channelName, sampleCounterKey, sampleCounterObjectId } = ctx; + + const counter = root.get(sampleCounterKey); + const expectedCounterIncrements = [100, -100, Number.MAX_SAFE_INTEGER, -Number.MAX_SAFE_INTEGER]; + let currentUpdateIndex = 0; + + const subscriptionPromise = new Promise((resolve, reject) => + counter.subscribe((update) => { + try { + const expectedInc = expectedCounterIncrements[currentUpdateIndex]; + expect(update).to.deep.equal( + { update: { inc: expectedInc } }, + `Check counter subscription callback is called with an expected update object for ${currentUpdateIndex + 1} times`, + ); + + if (currentUpdateIndex === expectedCounterIncrements.length - 1) { + resolve(); + } + + currentUpdateIndex++; + } catch (error) { + reject(error); + } + }), + ); + + for (const increment of expectedCounterIncrements) { + await liveObjectsHelper.stateRequest( + channelName, + liveObjectsHelper.counterIncOp({ + objectId: sampleCounterObjectId, + amount: increment, + }), + ); + } + + await subscriptionPromise; + }, + }, + + { + description: 'can subscribe to the incoming operations on a LiveMap', + action: async (ctx) => { + const { root, liveObjectsHelper, channelName, sampleMapKey, sampleMapObjectId } = ctx; + + const map = root.get(sampleMapKey); + const expectedMapUpdates = [ + { update: { foo: 'updated' } }, + { update: { bar: 'updated' } }, + { update: { foo: 'deleted' } }, + { update: { baz: 'updated' } }, + { update: { bar: 'deleted' } }, + ]; + let currentUpdateIndex = 0; + + const subscriptionPromise = new Promise((resolve, reject) => + map.subscribe((update) => { + try { + expect(update).to.deep.equal( + expectedMapUpdates[currentUpdateIndex], + `Check map subscription callback is called with an expected update object for ${currentUpdateIndex + 1} times`, + ); + + if (currentUpdateIndex === expectedMapUpdates.length - 1) { + resolve(); + } + + currentUpdateIndex++; + } catch (error) { + reject(error); + } + }), + ); + + await liveObjectsHelper.stateRequest( + channelName, + liveObjectsHelper.mapSetOp({ + objectId: sampleMapObjectId, + key: 'foo', + data: { value: 'something' }, + }), + ); + + await liveObjectsHelper.stateRequest( + channelName, + liveObjectsHelper.mapSetOp({ + objectId: sampleMapObjectId, + key: 'bar', + data: { value: 'something' }, + }), + ); + + await liveObjectsHelper.stateRequest( + channelName, + liveObjectsHelper.mapRemoveOp({ + objectId: sampleMapObjectId, + key: 'foo', + }), + ); + + await liveObjectsHelper.stateRequest( + channelName, + liveObjectsHelper.mapSetOp({ + objectId: sampleMapObjectId, + key: 'baz', + data: { value: 'something' }, + }), + ); + + await liveObjectsHelper.stateRequest( + channelName, + liveObjectsHelper.mapRemoveOp({ + objectId: sampleMapObjectId, + key: 'bar', + }), + ); + + await subscriptionPromise; + }, + }, + + { + description: 'can unsubscribe from LiveCounter updates via returned "unsubscribe" callback', + action: async (ctx) => { + const { root, liveObjectsHelper, channelName, sampleCounterKey, sampleCounterObjectId } = ctx; + + const counter = root.get(sampleCounterKey); + let callbackCalled = 0; + const subscriptionPromise = new Promise((resolve) => { + const { unsubscribe } = counter.subscribe(() => { + callbackCalled++; + // unsubscribe from future updates after the first call + unsubscribe(); + resolve(); + }); + }); + + const increments = 3; + for (let i = 0; i < increments; i++) { + await liveObjectsHelper.stateRequest( + channelName, + liveObjectsHelper.counterIncOp({ + objectId: sampleCounterObjectId, + amount: 1, + }), + ); + } + + await subscriptionPromise; + + expect(counter.value()).to.equal(3, 'Check counter has final expected value after all increments'); + expect(callbackCalled).to.equal(1, 'Check subscription callback was only called once'); + }, + }, + + { + description: 'can unsubscribe from LiveCounter updates via LiveCounter.unsubscribe() call', + action: async (ctx) => { + const { root, liveObjectsHelper, channelName, sampleCounterKey, sampleCounterObjectId } = ctx; + + const counter = root.get(sampleCounterKey); + let callbackCalled = 0; + const subscriptionPromise = new Promise((resolve) => { + const listener = () => { + callbackCalled++; + // unsubscribe from future updates after the first call + counter.unsubscribe(listener); + resolve(); + }; + + counter.subscribe(listener); + }); + + const increments = 3; + for (let i = 0; i < increments; i++) { + await liveObjectsHelper.stateRequest( + channelName, + liveObjectsHelper.counterIncOp({ + objectId: sampleCounterObjectId, + amount: 1, + }), + ); + } + + await subscriptionPromise; + + expect(counter.value()).to.equal(3, 'Check counter has final expected value after all increments'); + expect(callbackCalled).to.equal(1, 'Check subscription callback was only called once'); + }, + }, + + { + description: 'can remove all LiveCounter update listeners via LiveCounter.unsubscribeAll() call', + action: async (ctx) => { + const { root, liveObjectsHelper, channelName, sampleCounterKey, sampleCounterObjectId } = ctx; + + const counter = root.get(sampleCounterKey); + const callbacks = 3; + const callbacksCalled = new Array(callbacks).fill(0); + const subscriptionPromises = []; + + for (let i = 0; i < callbacks; i++) { + const promise = new Promise((resolve) => { + counter.subscribe(() => { + callbacksCalled[i]++; + resolve(); + }); + }); + subscriptionPromises.push(promise); + } + + const increments = 3; + for (let i = 0; i < increments; i++) { + await liveObjectsHelper.stateRequest( + channelName, + liveObjectsHelper.counterIncOp({ + objectId: sampleCounterObjectId, + amount: 1, + }), + ); + + if (i === 0) { + // unsub all after first operation + counter.unsubscribeAll(); + } + } + + await Promise.all(subscriptionPromises); + + expect(counter.value()).to.equal(3, 'Check counter has final expected value after all increments'); + callbacksCalled.forEach((x) => expect(x).to.equal(1, 'Check subscription callbacks were called once each')); + }, + }, + + { + description: 'can unsubscribe from LiveMap updates via returned "unsubscribe" callback', + action: async (ctx) => { + const { root, liveObjectsHelper, channelName, sampleMapKey, sampleMapObjectId } = ctx; + + const map = root.get(sampleMapKey); + let callbackCalled = 0; + const subscriptionPromise = new Promise((resolve) => { + const { unsubscribe } = map.subscribe(() => { + callbackCalled++; + // unsubscribe from future updates after the first call + unsubscribe(); + resolve(); + }); + }); + + const mapSets = 3; + for (let i = 0; i < mapSets; i++) { + await liveObjectsHelper.stateRequest( + channelName, + liveObjectsHelper.mapSetOp({ + objectId: sampleMapObjectId, + key: `foo-${i}`, + data: { value: 'exists' }, + }), + ); + } + + await subscriptionPromise; + + for (let i = 0; i < mapSets; i++) { + expect(map.get(`foo-${i}`)).to.equal( + 'exists', + `Check map has value for key "foo-${i}" after all map sets`, + ); + } + expect(callbackCalled).to.equal(1, 'Check subscription callback was only called once'); + }, + }, + + { + description: 'can unsubscribe from LiveMap updates via LiveMap.unsubscribe() call', + action: async (ctx) => { + const { root, liveObjectsHelper, channelName, sampleMapKey, sampleMapObjectId } = ctx; + + const map = root.get(sampleMapKey); + let callbackCalled = 0; + const subscriptionPromise = new Promise((resolve) => { + const listener = () => { + callbackCalled++; + // unsubscribe from future updates after the first call + map.unsubscribe(listener); + resolve(); + }; + + map.subscribe(listener); + }); + + const mapSets = 3; + for (let i = 0; i < mapSets; i++) { + await liveObjectsHelper.stateRequest( + channelName, + liveObjectsHelper.mapSetOp({ + objectId: sampleMapObjectId, + key: `foo-${i}`, + data: { value: 'exists' }, + }), + ); + } + + await subscriptionPromise; + + for (let i = 0; i < mapSets; i++) { + expect(map.get(`foo-${i}`)).to.equal( + 'exists', + `Check map has value for key "foo-${i}" after all map sets`, + ); + } + expect(callbackCalled).to.equal(1, 'Check subscription callback was only called once'); + }, + }, + + { + description: 'can remove all LiveMap update listeners via LiveMap.unsubscribeAll() call', + action: async (ctx) => { + const { root, liveObjectsHelper, channelName, sampleMapKey, sampleMapObjectId } = ctx; + + const map = root.get(sampleMapKey); + const callbacks = 3; + const callbacksCalled = new Array(callbacks).fill(0); + const subscriptionPromises = []; + + for (let i = 0; i < callbacks; i++) { + const promise = new Promise((resolve) => { + map.subscribe(() => { + callbacksCalled[i]++; + resolve(); + }); + }); + subscriptionPromises.push(promise); + } + + const mapSets = 3; + for (let i = 0; i < mapSets; i++) { + await liveObjectsHelper.stateRequest( + channelName, + liveObjectsHelper.mapSetOp({ + objectId: sampleMapObjectId, + key: `foo-${i}`, + data: { value: 'exists' }, + }), + ); + + if (i === 0) { + // unsub all after first operation + map.unsubscribeAll(); + } + } + + await Promise.all(subscriptionPromises); + + for (let i = 0; i < mapSets; i++) { + expect(map.get(`foo-${i}`)).to.equal( + 'exists', + `Check map has value for key "foo-${i}" after all map sets`, + ); + } + callbacksCalled.forEach((x) => expect(x).to.equal(1, 'Check subscription callbacks were called once each')); + }, + }, + ]; + + for (const scenario of subscriptionCallbacksScenarios) { + if (scenario.skip === true) { + continue; + } + + /** @nospec */ + it(scenario.description, async function () { + const helper = this.test.helper; + const liveObjectsHelper = new LiveObjectsHelper(helper); + const client = RealtimeWithLiveObjects(helper); + + await helper.monitorConnectionThenCloseAndFinish(async () => { + const channelName = scenario.description; + const channel = client.channels.get(channelName, channelOptionsWithLiveObjects()); + const liveObjects = channel.liveObjects; + + await channel.attach(); + const root = await liveObjects.getRoot(); + + const sampleMapKey = 'sampleMap'; + const sampleCounterKey = 'sampleCounter'; + + // prepare map and counter objects for use by the scenario + const { objectId: sampleMapObjectId } = await liveObjectsHelper.createAndSetOnMap(channelName, { + mapObjectId: 'root', + key: sampleMapKey, + createOp: liveObjectsHelper.mapCreateOp(), + }); + const { objectId: sampleCounterObjectId } = await liveObjectsHelper.createAndSetOnMap(channelName, { + mapObjectId: 'root', + key: sampleCounterKey, + createOp: liveObjectsHelper.counterCreateOp(), + }); + + await scenario.action({ + root, + liveObjectsHelper, + channelName, + channel, + sampleMapKey, + sampleMapObjectId, + sampleCounterKey, + sampleCounterObjectId, + }); + }, client); + }); + } }); /** @nospec */