diff --git a/packages/agent/src/sync-engine-level.ts b/packages/agent/src/sync-engine-level.ts index 15affed2b..f73b67af9 100644 --- a/packages/agent/src/sync-engine-level.ts +++ b/packages/agent/src/sync-engine-level.ts @@ -69,6 +69,7 @@ export class SyncEngineLevel implements SyncEngine { private _db: AbstractLevel; private _syncIntervalId?: ReturnType; + private _syncLock = false; private _ulidFactory: ULIDFactory; constructor({ agent, dataPath, db }: SyncEngineLevelParams) { @@ -264,45 +265,54 @@ export class SyncEngineLevel implements SyncEngine { } public async sync(direction?: 'push' | 'pull'): Promise { - if (this._syncIntervalId) { - throw new Error('SyncEngineLevel: Cannot call sync while a sync interval is active. Call `stopSync()` first.'); + if (this._syncLock) { + throw new Error('SyncEngineLevel: Sync operation is already in progress.'); } - if (!direction || direction === 'push') { - await this.push(); - } - if (!direction || direction === 'pull') { - await this.pull(); + this._syncLock = true; + try { + if (!direction || direction === 'push') { + await this.push(); + } + if (!direction || direction === 'pull') { + await this.pull(); + } + } finally { + this._syncLock = false; } } - public startSync({ interval }: { + public async startSync({ interval }: { interval: string }): Promise { // Convert the interval string to milliseconds. const intervalMilliseconds = ms(interval); - return new Promise((resolve, reject) => { - - const intervalSync = async () => { - if (this._syncIntervalId) { - clearInterval(this._syncIntervalId); - } + const intervalSync = async () => { + if (this._syncLock) { + return; + } - try { - await this.push(); - await this.pull(); - } catch (error: any) { - this.stopSync(); - reject(error); - } + clearInterval(this._syncIntervalId); + this._syncIntervalId = undefined; + await this.sync(); - // then we start sync again + if (!this._syncIntervalId) { this._syncIntervalId = setInterval(intervalSync, intervalMilliseconds); - }; + } + }; - this._syncIntervalId = setInterval(intervalSync, intervalMilliseconds); - }); + if (this._syncIntervalId) { + clearInterval(this._syncIntervalId); + } + + // Set up a new interval. + this._syncIntervalId = setInterval(intervalSync, intervalMilliseconds); + + // initiate an immediate sync + if (!this._syncLock) { + await this.sync(); + } } public stopSync(): void { @@ -526,7 +536,16 @@ export class SyncEngineLevel implements SyncEngine { // iterate over all registered identities for await (const [ did, options ] of this._db.sublevel('registeredIdentities').iterator()) { - const { protocols, delegateDid } = JSON.parse(options) as SyncIdentityOptions; + + const { protocols, delegateDid } = await new Promise((resolve) => { + try { + const { protocols, delegateDid } = JSON.parse(options) as SyncIdentityOptions; + resolve({ protocols, delegateDid }); + } catch(error: any) { + resolve({ protocols: [] }); + } + }); + // First, confirm the DID can be resolved and extract the DWN service endpoint URLs. const dwnEndpointUrls = await getDwnServiceEndpointUrls(did, this.agent.did); if (dwnEndpointUrls.length === 0) { diff --git a/packages/agent/src/types/sync.ts b/packages/agent/src/types/sync.ts index 1fc5666d3..e37dfb508 100644 --- a/packages/agent/src/types/sync.ts +++ b/packages/agent/src/types/sync.ts @@ -1,13 +1,43 @@ import type { Web5PlatformAgent } from './agent.js'; +/** + * The SyncEngine is responsible for syncing messages between the agent and the platform. + */ export type SyncIdentityOptions = { + /** + * The delegate DID that should be used to sign the sync messages. + */ delegateDid?: string; + /** + * The protocols that should be synced for this identity, if an empty array is provided, all messages for all protocols will be synced. + */ protocols: string[] } export interface SyncEngine { + /** + * The agent that the SyncEngine is attached to. + */ agent: Web5PlatformAgent; + /** + * Register an identity to be managed by the SyncEngine for syncing. + * The options can define specific protocols that should only be synced, or a delegate DID that should be used to sign the sync messages. + */ registerIdentity(params: { did: string, options?: SyncIdentityOptions }): Promise; + /** + * Preforms a one-shot sync operation. If no direction is provided, it will perform both push and pull. + * @param direction which direction you'd like to perform the sync operation. + * + * @throws {Error} if a sync is already in progress or the sync operation fails. + */ sync(direction?: 'push' | 'pull'): Promise; + /** + * Starts a periodic sync that runs at an interval. Subsequent calls to startSync will update the interval. + * + * @param params { interval: string } the interval at which the sync operation should be performed. ex: '30s', '1m', '10m' + */ startSync(params: { interval: string }): Promise; + /** + * Stops the periodic sync operation, will complete the current sync operation if one is already in progress. + */ stopSync(): void; } \ No newline at end of file diff --git a/packages/agent/tests/sync-engine-level.spec.ts b/packages/agent/tests/sync-engine-level.spec.ts index 5037f6a30..e2f8a405f 100644 --- a/packages/agent/tests/sync-engine-level.spec.ts +++ b/packages/agent/tests/sync-engine-level.spec.ts @@ -465,24 +465,41 @@ describe('SyncEngineLevel', () => { expect(pullSpy.calledOnce).to.be.true; }); - it('throws if sync is attempted while an interval sync is running', async () => { + it('throws an error if the sync is currently already running', async () => { // Register Alice's DID to be synchronized. await testHarness.agent.sync.registerIdentity({ did: alice.did.uri, }); - // start the sync engine with an interval of 10 seconds - syncEngine.startSync({ interval: '10s' }); + const clock = sinon.useFakeTimers(); + sinon.stub(syncEngine as any, 'push').resolves(); + const pullSpy = sinon.stub(syncEngine as any, 'pull'); + pullSpy.returns(new Promise((resolve) => { + clock.setTimeout(() => { + resolve(); + }, 90); + })); + // do not await + syncEngine.sync(); + + await clock.tickAsync(50); + + // do not block for subsequent syncs + pullSpy.returns(Promise.resolve()); try { - // Execute Sync to push and pull all records from Alice's remote DWN to Alice's local DWN. await syncEngine.sync(); - expect.fail('Expected an error to be thrown'); - } catch (error: any) { - // Execute Sync to push and pull all records from Alice's remote DWN to Alice's local DWN. - expect(error.message).to.equal('SyncEngineLevel: Cannot call sync while a sync interval is active. Call `stopSync()` first.'); + } catch(error:any) { + expect(error.message).to.equal('SyncEngineLevel: Sync operation is already in progress.'); } + + await clock.tickAsync(50); + + // no error thrown + await syncEngine.sync(); + + clock.restore(); }); }); @@ -2436,8 +2453,9 @@ describe('SyncEngineLevel', () => { pushSpy.restore(); clock.restore(); - expect(pullSpy.callCount).to.equal(2, 'push'); - expect(pushSpy.callCount).to.equal(2, 'pull'); + // one when starting the sync, and another for each interval + expect(pullSpy.callCount).to.equal(3, 'push'); + expect(pushSpy.callCount).to.equal(3, 'pull'); }); it('does not call sync() again until a sync round finishes', async () => { @@ -2461,18 +2479,112 @@ describe('SyncEngineLevel', () => { await clock.tickAsync(1_400); // less time than the push + // only once for when starting the sync expect(pullSpy.callCount).to.equal(1, 'pull'); expect(pullSpy.callCount).to.equal(1, 'push'); - await clock.tickAsync(600); //remaining time for a 2nd sync + await clock.tickAsync(200); //remaining time and one interval + // once when starting, and once for the interval expect(pullSpy.callCount).to.equal(2, 'pull'); expect(pushSpy.callCount).to.equal(2, 'push'); + await clock.tickAsync(500); // one more interval + + // one more for the interval + expect(pullSpy.callCount).to.equal(3, 'pull'); + expect(pushSpy.callCount).to.equal(3, 'push'); + pullSpy.restore(); pushSpy.restore(); clock.restore(); }); + + it('calls sync once per interval with the latest interval timer being respected', async () => { + await testHarness.agent.sync.registerIdentity({ + did: alice.did.uri, + }); + + const clock = sinon.useFakeTimers(); + + const syncSpy = sinon.stub(SyncEngineLevel.prototype as any, 'sync'); + // set to be a sync time longer than the interval + syncSpy.returns(new Promise((resolve) => { + clock.setTimeout(() => { + resolve(); + }, 1_000); + })); + + testHarness.agent.sync.startSync({ interval: '500ms' }); + + await clock.tickAsync(1_400); // less than the initial interval + the sync time + + // once for the initial call and once for each interval call + expect(syncSpy.callCount).to.equal(2); + + // set to be a short sync time + syncSpy.returns(new Promise((resolve) => { + clock.setTimeout(() => { + resolve(); + }, 15); + })); + + testHarness.agent.sync.startSync({ interval: '300ms' }); + + await clock.tickAsync(301); // exactly the new interval + 1 + + // one for the initial 'startSync' call and one for each interval call + expect(syncSpy.callCount).to.equal(4); + + + await clock.tickAsync(601); // two more intervals + + expect(syncSpy.callCount).to.equal(6); + + syncSpy.restore(); + clock.restore(); + }); + + it('should replace the interval timer with the latest interval timer', async () => { + + await testHarness.agent.sync.registerIdentity({ + did: alice.did.uri, + }); + + const clock = sinon.useFakeTimers(); + + const syncSpy = sinon.stub(SyncEngineLevel.prototype as any, 'sync'); + // set to be a sync time longer than the interval + syncSpy.returns(new Promise((resolve) => { + clock.setTimeout(() => { + resolve(); + }, 100); + })); + + testHarness.agent.sync.startSync({ interval: '500ms' }); + + // two intervals + await clock.tickAsync(1_001); + + // this should equal 3, once for the initial call and once for each interval call + expect(syncSpy.callCount).to.equal(3); + + syncSpy.resetHistory(); + testHarness.agent.sync.startSync({ interval: '200ms' }); + + await clock.tickAsync(401); // two intervals + + // one for the initial 'startSync' call and one for each interval call + expect(syncSpy.callCount).to.equal(3); + + await clock.tickAsync(401); // two more intervals + + // one additional calls for each interval + expect(syncSpy.callCount).to.equal(5); + + syncSpy.restore(); + clock.restore(); + }); }); }); }); \ No newline at end of file diff --git a/packages/api/tests/dwn-api.spec.ts b/packages/api/tests/dwn-api.spec.ts index 08663e149..defad0c1c 100644 --- a/packages/api/tests/dwn-api.spec.ts +++ b/packages/api/tests/dwn-api.spec.ts @@ -98,6 +98,11 @@ describe('DwnApi', () => { await delegateHarness.createAgentDid(); }); + after(async () => { + await delegateHarness.clearStorage(); + await delegateHarness.closeStorage(); + }); + beforeEach(async () => { sinon.restore(); await delegateHarness.syncStore.clear(); diff --git a/packages/api/tests/record.spec.ts b/packages/api/tests/record.spec.ts index 871324aff..29f94bd57 100644 --- a/packages/api/tests/record.spec.ts +++ b/packages/api/tests/record.spec.ts @@ -124,6 +124,11 @@ describe('Record', () => { await delegateHarness.createAgentDid(); }); + after(async () => { + await delegateHarness.clearStorage(); + await delegateHarness.closeStorage(); + }); + beforeEach(async () => { sinon.restore(); await delegateHarness.syncStore.clear();