From 2f13d65708ff4cdf2bd51b8b149eab86fc7f1596 Mon Sep 17 00:00:00 2001
From: Andrew Bulat <andrii.bulat@gmail.com>
Date: Tue, 5 Nov 2024 06:24:00 +0000
Subject: [PATCH] Add LiveObjects subscriptions tests

---
 test/realtime/live_objects.test.js | 521 ++++++++++++++++++++++++++++-
 1 file changed, 518 insertions(+), 3 deletions(-)

diff --git a/test/realtime/live_objects.test.js b/test/realtime/live_objects.test.js
index a1a9ffaef..0adaefbed 100644
--- a/test/realtime/live_objects.test.js
+++ b/test/realtime/live_objects.test.js
@@ -562,7 +562,7 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'],
               ),
             ).to.not.exist;
 
-            // create map with references. need to created referenced objects first to obtain their object ids
+            // create map with references. need to create referenced objects first to obtain their object ids
             const { objectId: referencedMapObjectId } = await liveObjectsHelper.stateRequest(
               channelName,
               liveObjectsHelper.mapCreateOp({ entries: { stringKey: { data: { value: 'stringValue' } } } }),
@@ -905,7 +905,7 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'],
         });
       }
 
-      const operationsDuringSyncSequence = [
+      const applyOperationsDuringSyncScenarios = [
         {
           description: 'state operation messages are buffered during STATE_SYNC sequence',
           action: async (ctx) => {
@@ -1206,7 +1206,7 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'],
         },
       ];
 
-      for (const scenario of operationsDuringSyncSequence) {
+      for (const scenario of applyOperationsDuringSyncScenarios) {
         if (scenario.skip === true) {
           continue;
         }
@@ -1231,6 +1231,521 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'],
           }, client);
         });
       }
+
+      const subscriptionCallbacksScenarios = [
+        {
+          description: 'can subscribe to the incoming COUNTER_INC operation on a LiveCounter',
+          action: async (ctx) => {
+            const { root, liveObjectsHelper, channelName, sampleCounterKey, sampleCounterObjectId } = ctx;
+
+            const counter = root.get(sampleCounterKey);
+            const subscriptionPromise = new Promise((resolve, reject) =>
+              counter.subscribe((update) => {
+                try {
+                  expect(update).to.deep.equal(
+                    { update: { inc: 1 } },
+                    'Check counter subscription callback is called with an expected update object for COUNTER_INC operation',
+                  );
+                  resolve();
+                } catch (error) {
+                  reject(error);
+                }
+              }),
+            );
+
+            await liveObjectsHelper.stateRequest(
+              channelName,
+              liveObjectsHelper.counterIncOp({
+                objectId: sampleCounterObjectId,
+                amount: 1,
+              }),
+            );
+
+            await subscriptionPromise;
+          },
+        },
+
+        {
+          description: 'can subscribe to the incoming MAP_SET operation on a LiveMap',
+          action: async (ctx) => {
+            const { root, liveObjectsHelper, channelName, sampleMapKey, sampleMapObjectId } = ctx;
+
+            const map = root.get(sampleMapKey);
+            const subscriptionPromise = new Promise((resolve, reject) =>
+              map.subscribe((update) => {
+                try {
+                  expect(update).to.deep.equal(
+                    { update: { stringKey: 'updated' } },
+                    'Check map subscription callback is called with an expected update object for MAP_SET operation',
+                  );
+                  resolve();
+                } catch (error) {
+                  reject(error);
+                }
+              }),
+            );
+
+            await liveObjectsHelper.stateRequest(
+              channelName,
+              liveObjectsHelper.mapSetOp({
+                objectId: sampleMapObjectId,
+                key: 'stringKey',
+                data: { value: 'stringValue' },
+              }),
+            );
+
+            await subscriptionPromise;
+          },
+        },
+
+        {
+          description: 'can subscribe to the incoming MAP_REMOVE operation on a LiveMap',
+          action: async (ctx) => {
+            const { root, liveObjectsHelper, channelName, sampleMapKey, sampleMapObjectId } = ctx;
+
+            const map = root.get(sampleMapKey);
+            const subscriptionPromise = new Promise((resolve, reject) =>
+              map.subscribe((update) => {
+                try {
+                  expect(update).to.deep.equal(
+                    { update: { stringKey: 'deleted' } },
+                    'Check map subscription callback is called with an expected update object for MAP_REMOVE operation',
+                  );
+                  resolve();
+                } catch (error) {
+                  reject(error);
+                }
+              }),
+            );
+
+            await liveObjectsHelper.stateRequest(
+              channelName,
+              liveObjectsHelper.mapRemoveOp({
+                objectId: sampleMapObjectId,
+                key: 'stringKey',
+              }),
+            );
+
+            await subscriptionPromise;
+          },
+        },
+
+        {
+          description: 'can subscribe to multiple incoming operations on a LiveCounter',
+          action: async (ctx) => {
+            const { root, liveObjectsHelper, channelName, sampleCounterKey, sampleCounterObjectId } = ctx;
+
+            const counter = root.get(sampleCounterKey);
+            const expectedCounterIncrements = [100, -100, Number.MAX_SAFE_INTEGER, -Number.MAX_SAFE_INTEGER];
+            let currentUpdateIndex = 0;
+
+            const subscriptionPromise = new Promise((resolve, reject) =>
+              counter.subscribe((update) => {
+                try {
+                  const expectedInc = expectedCounterIncrements[currentUpdateIndex];
+                  expect(update).to.deep.equal(
+                    { update: { inc: expectedInc } },
+                    `Check counter subscription callback is called with an expected update object for ${currentUpdateIndex + 1} times`,
+                  );
+
+                  if (currentUpdateIndex === expectedCounterIncrements.length - 1) {
+                    resolve();
+                  }
+
+                  currentUpdateIndex++;
+                } catch (error) {
+                  reject(error);
+                }
+              }),
+            );
+
+            for (const increment of expectedCounterIncrements) {
+              await liveObjectsHelper.stateRequest(
+                channelName,
+                liveObjectsHelper.counterIncOp({
+                  objectId: sampleCounterObjectId,
+                  amount: increment,
+                }),
+              );
+            }
+
+            await subscriptionPromise;
+          },
+        },
+
+        {
+          description: 'can subscribe to the incoming operations on a LiveMap',
+          action: async (ctx) => {
+            const { root, liveObjectsHelper, channelName, sampleMapKey, sampleMapObjectId } = ctx;
+
+            const map = root.get(sampleMapKey);
+            const expectedMapUpdates = [
+              { update: { foo: 'updated' } },
+              { update: { bar: 'updated' } },
+              { update: { foo: 'deleted' } },
+              { update: { baz: 'updated' } },
+              { update: { bar: 'deleted' } },
+            ];
+            let currentUpdateIndex = 0;
+
+            const subscriptionPromise = new Promise((resolve, reject) =>
+              map.subscribe((update) => {
+                try {
+                  expect(update).to.deep.equal(
+                    expectedMapUpdates[currentUpdateIndex],
+                    `Check map subscription callback is called with an expected update object for ${currentUpdateIndex + 1} times`,
+                  );
+
+                  if (currentUpdateIndex === expectedMapUpdates.length - 1) {
+                    resolve();
+                  }
+
+                  currentUpdateIndex++;
+                } catch (error) {
+                  reject(error);
+                }
+              }),
+            );
+
+            await liveObjectsHelper.stateRequest(
+              channelName,
+              liveObjectsHelper.mapSetOp({
+                objectId: sampleMapObjectId,
+                key: 'foo',
+                data: { value: 'something' },
+              }),
+            );
+
+            await liveObjectsHelper.stateRequest(
+              channelName,
+              liveObjectsHelper.mapSetOp({
+                objectId: sampleMapObjectId,
+                key: 'bar',
+                data: { value: 'something' },
+              }),
+            );
+
+            await liveObjectsHelper.stateRequest(
+              channelName,
+              liveObjectsHelper.mapRemoveOp({
+                objectId: sampleMapObjectId,
+                key: 'foo',
+              }),
+            );
+
+            await liveObjectsHelper.stateRequest(
+              channelName,
+              liveObjectsHelper.mapSetOp({
+                objectId: sampleMapObjectId,
+                key: 'baz',
+                data: { value: 'something' },
+              }),
+            );
+
+            await liveObjectsHelper.stateRequest(
+              channelName,
+              liveObjectsHelper.mapRemoveOp({
+                objectId: sampleMapObjectId,
+                key: 'bar',
+              }),
+            );
+
+            await subscriptionPromise;
+          },
+        },
+
+        {
+          description: 'can unsubscribe from LiveCounter updates via returned "unsubscribe" callback',
+          action: async (ctx) => {
+            const { root, liveObjectsHelper, channelName, sampleCounterKey, sampleCounterObjectId } = ctx;
+
+            const counter = root.get(sampleCounterKey);
+            let callbackCalled = 0;
+            const subscriptionPromise = new Promise((resolve) => {
+              const { unsubscribe } = counter.subscribe(() => {
+                callbackCalled++;
+                // unsubscribe from future updates after the first call
+                unsubscribe();
+                resolve();
+              });
+            });
+
+            const increments = 3;
+            for (let i = 0; i < increments; i++) {
+              await liveObjectsHelper.stateRequest(
+                channelName,
+                liveObjectsHelper.counterIncOp({
+                  objectId: sampleCounterObjectId,
+                  amount: 1,
+                }),
+              );
+            }
+
+            await subscriptionPromise;
+
+            expect(counter.value()).to.equal(3, 'Check counter has final expected value after all increments');
+            expect(callbackCalled).to.equal(1, 'Check subscription callback was only called once');
+          },
+        },
+
+        {
+          description: 'can unsubscribe from LiveCounter updates via LiveCounter.unsubscribe() call',
+          action: async (ctx) => {
+            const { root, liveObjectsHelper, channelName, sampleCounterKey, sampleCounterObjectId } = ctx;
+
+            const counter = root.get(sampleCounterKey);
+            let callbackCalled = 0;
+            const subscriptionPromise = new Promise((resolve) => {
+              const listener = () => {
+                callbackCalled++;
+                // unsubscribe from future updates after the first call
+                counter.unsubscribe(listener);
+                resolve();
+              };
+
+              counter.subscribe(listener);
+            });
+
+            const increments = 3;
+            for (let i = 0; i < increments; i++) {
+              await liveObjectsHelper.stateRequest(
+                channelName,
+                liveObjectsHelper.counterIncOp({
+                  objectId: sampleCounterObjectId,
+                  amount: 1,
+                }),
+              );
+            }
+
+            await subscriptionPromise;
+
+            expect(counter.value()).to.equal(3, 'Check counter has final expected value after all increments');
+            expect(callbackCalled).to.equal(1, 'Check subscription callback was only called once');
+          },
+        },
+
+        {
+          description: 'can remove all LiveCounter update listeners via LiveCounter.unsubscribeAll() call',
+          action: async (ctx) => {
+            const { root, liveObjectsHelper, channelName, sampleCounterKey, sampleCounterObjectId } = ctx;
+
+            const counter = root.get(sampleCounterKey);
+            const callbacks = 3;
+            const callbacksCalled = new Array(callbacks).fill(0);
+            const subscriptionPromises = [];
+
+            for (let i = 0; i < callbacks; i++) {
+              const promise = new Promise((resolve) => {
+                counter.subscribe(() => {
+                  callbacksCalled[i]++;
+                  resolve();
+                });
+              });
+              subscriptionPromises.push(promise);
+            }
+
+            const increments = 3;
+            for (let i = 0; i < increments; i++) {
+              await liveObjectsHelper.stateRequest(
+                channelName,
+                liveObjectsHelper.counterIncOp({
+                  objectId: sampleCounterObjectId,
+                  amount: 1,
+                }),
+              );
+
+              if (i === 0) {
+                // unsub all after first operation
+                counter.unsubscribeAll();
+              }
+            }
+
+            await Promise.all(subscriptionPromises);
+
+            expect(counter.value()).to.equal(3, 'Check counter has final expected value after all increments');
+            callbacksCalled.forEach((x) => expect(x).to.equal(1, 'Check subscription callbacks were called once each'));
+          },
+        },
+
+        {
+          description: 'can unsubscribe from LiveMap updates via returned "unsubscribe" callback',
+          action: async (ctx) => {
+            const { root, liveObjectsHelper, channelName, sampleMapKey, sampleMapObjectId } = ctx;
+
+            const map = root.get(sampleMapKey);
+            let callbackCalled = 0;
+            const subscriptionPromise = new Promise((resolve) => {
+              const { unsubscribe } = map.subscribe(() => {
+                callbackCalled++;
+                // unsubscribe from future updates after the first call
+                unsubscribe();
+                resolve();
+              });
+            });
+
+            const mapSets = 3;
+            for (let i = 0; i < mapSets; i++) {
+              await liveObjectsHelper.stateRequest(
+                channelName,
+                liveObjectsHelper.mapSetOp({
+                  objectId: sampleMapObjectId,
+                  key: `foo-${i}`,
+                  data: { value: 'exists' },
+                }),
+              );
+            }
+
+            await subscriptionPromise;
+
+            for (let i = 0; i < mapSets; i++) {
+              expect(map.get(`foo-${i}`)).to.equal(
+                'exists',
+                `Check map has value for key "foo-${i}" after all map sets`,
+              );
+            }
+            expect(callbackCalled).to.equal(1, 'Check subscription callback was only called once');
+          },
+        },
+
+        {
+          description: 'can unsubscribe from LiveMap updates via LiveMap.unsubscribe() call',
+          action: async (ctx) => {
+            const { root, liveObjectsHelper, channelName, sampleMapKey, sampleMapObjectId } = ctx;
+
+            const map = root.get(sampleMapKey);
+            let callbackCalled = 0;
+            const subscriptionPromise = new Promise((resolve) => {
+              const listener = () => {
+                callbackCalled++;
+                // unsubscribe from future updates after the first call
+                map.unsubscribe(listener);
+                resolve();
+              };
+
+              map.subscribe(listener);
+            });
+
+            const mapSets = 3;
+            for (let i = 0; i < mapSets; i++) {
+              await liveObjectsHelper.stateRequest(
+                channelName,
+                liveObjectsHelper.mapSetOp({
+                  objectId: sampleMapObjectId,
+                  key: `foo-${i}`,
+                  data: { value: 'exists' },
+                }),
+              );
+            }
+
+            await subscriptionPromise;
+
+            for (let i = 0; i < mapSets; i++) {
+              expect(map.get(`foo-${i}`)).to.equal(
+                'exists',
+                `Check map has value for key "foo-${i}" after all map sets`,
+              );
+            }
+            expect(callbackCalled).to.equal(1, 'Check subscription callback was only called once');
+          },
+        },
+
+        {
+          description: 'can remove all LiveMap update listeners via LiveMap.unsubscribeAll() call',
+          action: async (ctx) => {
+            const { root, liveObjectsHelper, channelName, sampleMapKey, sampleMapObjectId } = ctx;
+
+            const map = root.get(sampleMapKey);
+            const callbacks = 3;
+            const callbacksCalled = new Array(callbacks).fill(0);
+            const subscriptionPromises = [];
+
+            for (let i = 0; i < callbacks; i++) {
+              const promise = new Promise((resolve) => {
+                map.subscribe(() => {
+                  callbacksCalled[i]++;
+                  resolve();
+                });
+              });
+              subscriptionPromises.push(promise);
+            }
+
+            const mapSets = 3;
+            for (let i = 0; i < mapSets; i++) {
+              await liveObjectsHelper.stateRequest(
+                channelName,
+                liveObjectsHelper.mapSetOp({
+                  objectId: sampleMapObjectId,
+                  key: `foo-${i}`,
+                  data: { value: 'exists' },
+                }),
+              );
+
+              if (i === 0) {
+                // unsub all after first operation
+                map.unsubscribeAll();
+              }
+            }
+
+            await Promise.all(subscriptionPromises);
+
+            for (let i = 0; i < mapSets; i++) {
+              expect(map.get(`foo-${i}`)).to.equal(
+                'exists',
+                `Check map has value for key "foo-${i}" after all map sets`,
+              );
+            }
+            callbacksCalled.forEach((x) => expect(x).to.equal(1, 'Check subscription callbacks were called once each'));
+          },
+        },
+      ];
+
+      for (const scenario of subscriptionCallbacksScenarios) {
+        if (scenario.skip === true) {
+          continue;
+        }
+
+        /** @nospec */
+        it(scenario.description, async function () {
+          const helper = this.test.helper;
+          const liveObjectsHelper = new LiveObjectsHelper(helper);
+          const client = RealtimeWithLiveObjects(helper);
+
+          await helper.monitorConnectionThenCloseAndFinish(async () => {
+            const channelName = scenario.description;
+            const channel = client.channels.get(channelName, channelOptionsWithLiveObjects());
+            const liveObjects = channel.liveObjects;
+
+            await channel.attach();
+            const root = await liveObjects.getRoot();
+
+            const sampleMapKey = 'sampleMap';
+            const sampleCounterKey = 'sampleCounter';
+
+            // prepare map and counter objects for use by the scenario
+            const { objectId: sampleMapObjectId } = await liveObjectsHelper.createAndSetOnMap(channelName, {
+              mapObjectId: 'root',
+              key: sampleMapKey,
+              createOp: liveObjectsHelper.mapCreateOp(),
+            });
+            const { objectId: sampleCounterObjectId } = await liveObjectsHelper.createAndSetOnMap(channelName, {
+              mapObjectId: 'root',
+              key: sampleCounterKey,
+              createOp: liveObjectsHelper.counterCreateOp(),
+            });
+
+            await scenario.action({
+              root,
+              liveObjectsHelper,
+              channelName,
+              channel,
+              sampleMapKey,
+              sampleMapObjectId,
+              sampleCounterKey,
+              sampleCounterObjectId,
+            });
+          }, client);
+        });
+      }
     });
 
     /** @nospec */