Skip to content

Commit

Permalink
Finally a working flow
Browse files Browse the repository at this point in the history
  • Loading branch information
dfahlander committed Jul 23, 2024
1 parent a3ef4b1 commit 3e32413
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 73 deletions.
24 changes: 23 additions & 1 deletion src/public/types/yjs-related.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,39 @@ export interface DucktypedAwareness extends DucktypedYObservable {


export interface YUpdateRow {
/** The primary key in the update-table
*
*/
i: number;

/** The primary key of the row in related table holding the document property.
*
*/
k: IndexableType;

/** The Y update
*
*/
u: Uint8Array;
f?: number;

/** Optional flag
*
* 1 = LOCAL_CHANGE_MAYBE_UNSYNCED
*
*/
f?: number;
}

export interface YSyncer {
i: string;
unsentFrom: number;
}

export interface YLastCompressed {
i: 0;
compressedUntil: number;
}

export interface DexieYProvider<YDoc=any> {
readonly doc: YDoc;
awareness?: any;
Expand Down
167 changes: 100 additions & 67 deletions src/yjs/compressYDocs.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
import { Dexie } from '../public/types/dexie';
import Promise from '../helpers/promise';
//import Promise from '../helpers/promise';
import type { Table } from '../public/types/table';
import type { YSyncer, YUpdateRow } from '../public/types/yjs-related';
import type {
YLastCompressed,
YSyncer,
YUpdateRow,
} from '../public/types/yjs-related';
import { PromiseExtended } from '../public/types/promise-extended';
import { getYLibrary } from './getYLibrary';
import { RangeSet, getRangeSetIterator } from '../helpers/rangeset';
import { cmp } from '../functions/cmp';

/** Go through all Y.Doc tables in the entire local db and compress updates
*
Expand All @@ -20,87 +26,114 @@ export function compressYDocs(db: Dexie) {
)
),
Promise.resolve()
);
) as PromiseExtended<void>;
}

/** Compress an individual Y.Doc table */
function compressYDocsTable(
db: Dexie,
{ updTable }: { prop: string; updTable: string }
) {
return db
.table(updTable)
.where('i')
.startsWith('')
.toArray((syncers) => {
const unsentFrom = Math.min(
...syncers.map((s) => s.unsentFrom || Infinity)
);
return db
.table(updTable)
.orderBy('k')
.uniqueKeys((docIdsToCompress) => {
return docIdsToCompress.reduce((promise, docId) => {
return promise.then(() =>
compressYDoc(db, updTable, docId, unsentFrom)
const updTbl = db.table(updTable);
return Promise.all([
// syncers (for example dexie-cloud-addon or other 3rd part syncers) They may have unsentFrom set.
updTbl
.where('i')
.startsWith('') // Syncers have string primary keys while updates have auto-incremented numbers.
.toArray(),
// lastCompressed (pointer to the last compressed update)
updTbl.get(0),
]).then(([syncers, lastCompressed]: [YSyncer[], YLastCompressed]) => {
const unsentFrom = Math.min(
...syncers.map((s) => s.unsentFrom || Infinity)
);
const compressedUntil = lastCompressed?.compressedUntil || 0;
// Per updates-table:
// 1. Find all updates after lastCompressedId. Run toArray() on them.
// 2. IF there are any "mine" (flagged) updates AFTER unsentFrom, skip all from including this entry, else include all regardless of unsentFrom.
// 3. Now we know which keys have updates since last compression. We also know how far we're gonna go (max unsentFrom unless all additional updates are foreign).
// 4. For every key that had updates, load their main update (this is one single update per key before the lastCompressedId marker)
// 5. For every key that had updates: Compress main update along with additional updates until and including the number that was computed on step 2 (could be Infinity).
// 6. Update lastCompressedId to the i of the latest compressed entry.
return updTbl
.where('i')
.between(compressedUntil, Infinity, false, false)
.toArray()
.then((addedUpdates: YUpdateRow[]) => {
if (addedUpdates.length <= 1) return; // For sure no updates to compress if there would be only 1.
const docIdsToCompress = new RangeSet();
let lastUpdateToCompress = compressedUntil + 1;
for (let j = 0; j < addedUpdates.length; ++j) {
const { i, f, k } = addedUpdates[j];
if (i >= unsentFrom) if (f) break; // An update that need to be synced was found. Stop here and let dontCompressFrom stay.
docIdsToCompress.addKey(k);
lastUpdateToCompress = i;
}
let promise = Promise.resolve();
let iter = getRangeSetIterator(docIdsToCompress);
for (
let keyIterRes = iter.next();
!keyIterRes.done;
keyIterRes = iter.next()
) {
const key = keyIterRes.value.from; // or keyIterRes.to - they are same.
const addedUpdatesForDoc = addedUpdates.filter(
(update) => cmp(update.k, key) === 0
);
if (addedUpdatesForDoc.length > 0) {
promise = promise.then(() =>
compressUpdatesForDoc(db, updTable, key, addedUpdatesForDoc)
);
}, Promise.resolve());
}
}
return promise.then(() => {
// Update lastCompressed atomically to the value we computed.
// Do it with respect to the case when another job was done in parallel
// that maybe compressed one or more extra updates and updated lastCompressed
// before us.
return db.transaction('rw', updTbl, () =>
updTbl.get(0).then(
(current) =>
lastUpdateToCompress > current.compressedUntil &&
updTbl.put({
i: 0,
compressedUntil: lastUpdateToCompress,
})
)
);
});
});
});
});
}

/** Compress an individual Y.Doc.
*
* Lists all updates for the Y.Doc and replaces them with a single compressed update if there are more than one updates.
*
* If there is a Syncer entry in the updates table (an entry where primary key `i` is a string, not a number),
* then the `unsentFrom` value is used to determine the last update that has not been sent to the server and therefore
* should not be compressed. Sync addons may store their syncers in the update table and name them in the primary key,
* with a string value instead of a number, to keep track of which updates have been sent to its server. This is a bit
* special that we reuse the same table that we have for updates also for syncers, but it's a way to keep track of
* which updates have been sent to the server without having to create a separate table for that.
*
* @param db Dexie instance
* @param updTable Name of the table where updates are stored
* @param k The primary key of the related table that holds the virtual Y.Doc property.
* @param dontCompressFrom Infinity if all updates can be compressed, otherwise id of the first update not to compress.
* @returns
*/
function compressYDoc(
export function compressUpdatesForDoc(
db: Dexie,
updTable: string,
k: any,
dontCompressFrom: number
): PromiseExtended<void> {
const Y = getYLibrary(db);
docRowId: any,
addedUpdatesToCompress: YUpdateRow[]
) {
if (addedUpdatesToCompress.length < 1) throw new Error('Invalid input');
return db.transaction('rw', updTable, (tx) => {
const updTbl = tx.table(updTable);
return updTbl
.where('k')
.equals(k) // Could have been using where('[k+i]').between([k, 0], [k, dontCompressFrom], true, false) but that would not work in older FF browsers.
.until((s) => s.i >= dontCompressFrom, false) // It's naturally ordered by i, as it is the primary key of updates
.toArray((updates) => {
const doc = new Y.Doc({gc: true});
if (updates.length > 1) {
// 1. compress updates where i is between these values
updates.forEach((update) => {
Y.applyUpdateV2(doc, update.u);
});
const compressedUpdate = Y.encodeStateAsUpdateV2(doc);
// 2. replace the last update with the compressed update
const lastUpdate = updates[updates.length - 1];
updTbl.put({
i: lastUpdate.i,
k,
u: compressedUpdate,
f: 2,
});
// 3. delete the compressed updates
updTbl
.where('i')
.between(updates[0].i, lastUpdate.i, true, false)
.delete();
return updTbl.where({ k: docRowId }).first((mainUpdate: YUpdateRow) => {
const updates = [mainUpdate].concat(addedUpdatesToCompress); // in some situations, mainUpdate will be included twice here. But Y.js doesn't care!
const Y = getYLibrary(db);
const doc = new Y.Doc({ gc: true });
updates.forEach((update) => {
if (cmp(update.k, docRowId) !== 0) {
throw new Error('Invalid update');
}
Y.applyUpdateV2(doc, update.u);
});
const compressedUpdate = Y.encodeStateAsUpdateV2(doc);
const lastUpdate = updates.pop();
return updTbl
.put({
i: lastUpdate.i,
k: docRowId,
u: compressedUpdate
})
.then(() => updTbl.bulkDelete(updates.map((update) => update.i)));
});
});
}
38 changes: 33 additions & 5 deletions test/tests-yjs.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,18 +105,44 @@ promisedTest('Test Y document compression', async () => {
doc.getArray('arr').insert(0, ['x', 'y', 'z']);
});
// Verify we have 3 updates:
equal(await db.table(updateTable).count(), 3, 'Three updates stored');
equal(await db.table(updateTable).where('i').between(1,Infinity).count(), 3, 'Three updates stored');
// Run the GC:
console.debug('Running GC', await db.table(updateTable).toArray());
await db.gc();
console.debug('After running GC', await db.table(updateTable).toArray());
// Verify we have 1 (compressed) update:
equal(await db.table(updateTable).count(), 1, 'One update stored after gc');
equal(await db.table(updateTable).where('i').between(1,Infinity).count(), 1, 'One update stored after gc');
// Verify the provider is still alive:
ok(!provider.destroyed, "Provider is not destroyed");
await db.transaction('rw', db.docs, () => {
doc.getArray('arr').insert(0, ['a', 'b', 'c']);
doc.getArray('arr').insert(0, ['1', '2', '3']);
doc.getArray('arr').insert(0, ['x', 'y', 'z']);
});
equal(await db.table(updateTable).where('i').between(1,Infinity).count(), 4, 'Four updates stored after additional inserts');
await db.gc();
equal(await db.table(updateTable).where('i').between(1,Infinity).count(), 1, 'One update stored after gc');
await db.docs.put({
id: 'doc2',
title: 'Hello2',
});
let row2 = await db.docs.get('doc2');
let doc2 = row2.content;
await new DexieYProvider(doc2).whenLoaded;
await db.transaction('rw', db.docs, async () => {
doc2.getArray('arr2').insert(0, ['a', 'b', 'c']);
doc2.getArray('arr2').insert(0, ['1', '2', '3']);
doc2.getArray('arr2').insert(0, ['x', 'y', 'z']);
});
equal(await db.table(updateTable).where('i').between(1,Infinity).count(), 4, 'Four updates stored after additional inserts');
await db.gc();
equal(await db.table(updateTable).where('i').between(1,Infinity).count(), 2, 'Two updates stored after gc (2 different docs)');

// Now clear the docs table, which should implicitly clear the updates as well as destroying connected providers:
await db.docs.clear();
// Verify there are no updates now:
equal(
await db.table(updateTable).count(),
await db.table(updateTable).where('i').between(1,Infinity).count(),
0,
'Zero update stored after clearing docs'
);
Expand All @@ -138,7 +164,7 @@ promisedTest('Test that syncers prohibit GC from compressing unsynced updates',
const updateTable = db.docs.schema.yProps.find(
(p) => p.prop === 'content'
).updTable;
equal(await db.table(updateTable).count(), 0, 'No docs stored yet');
equal(await db.table(updateTable).where('i').between(1,Infinity).count(), 0, 'No docs stored yet');

// Create three updates:
await db.transaction('rw', db.docs, () => {
Expand All @@ -147,15 +173,17 @@ promisedTest('Test that syncers prohibit GC from compressing unsynced updates',
doc.getArray('arr').insert(0, ['x', 'y', 'z']);
});
// Verify we have 3 updates:
equal(await db.table(updateTable).count(), 3, 'Three updates stored');
equal(await db.table(updateTable).where('i').between(1,Infinity).count(), 3, 'Three updates stored');

// Put a syncer in place that will not sync the updates:
await db.table(updateTable).put({
i: "MySyncer",
unsentFrom: await db.table(updateTable).orderBy('i').lastKey(), // Keep the last update and updates after that from being compressed
});

console.debug('Running GC');
await db.gc();
console.debug('After running GC');
// Verify we have 2 updates (the first 2 was compressed but the last one was not):
equal(await db.table(updateTable).where('i').between(1, Infinity).count(), 2, '2 updates stored');
await db.docs.delete(row.id);
Expand Down

0 comments on commit 3e32413

Please sign in to comment.