Skip to content

Commit

Permalink
Introducing version vector to solve GC problem (#899)
Browse files Browse the repository at this point in the history
This change introduces Lamport Synced Version Vector to resolve defects in the 
existing garbage collection system that used syncedSeq. Key improvements include:

- Added Version Vector implementation with Lamport timestamp support
- Implemented database storage and update mechanisms for version vectors
- Created min version vector computation for safe garbage collection
- Added handling for detached client's version vectors to prevent memory leaks
- Updated change ID generation to incorporate version vector information

The Version Vector ensures all changes are properly synchronized across replicas
before garbage collection occurs, improving system reliability and reducing
memory waste from detached clients.

---------

Co-authored-by: Youngteac Hong <[email protected]>
  • Loading branch information
JOOHOJANG and hackerwins authored Oct 23, 2024
1 parent 5d4ded9 commit 2124689
Show file tree
Hide file tree
Showing 18 changed files with 1,664 additions and 180 deletions.
59 changes: 59 additions & 0 deletions packages/sdk/src/api/converter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import { RGATreeList } from '@yorkie-js-sdk/src/document/crdt/rga_tree_list';
import { CRDTElement } from '@yorkie-js-sdk/src/document/crdt/element';
import { CRDTObject } from '@yorkie-js-sdk/src/document/crdt/object';
import { CRDTArray } from '@yorkie-js-sdk/src/document/crdt/array';
import { VersionVector } from '@yorkie-js-sdk/src/document/time/version_vector';
import { CRDTTreePos } from './../document/crdt/tree';
import {
RGATreeSplit,
Expand Down Expand Up @@ -78,6 +79,7 @@ import {
TreeNodes as PbTreeNodes,
TreePos as PbTreePos,
TreeNodeID as PbTreeNodeID,
VersionVector as PbVersionVector,
ValueType as PbValueType,
JSONElement_Tree as PbJSONElement_Tree,
JSONElement_Text as PbJSONElement_Text,
Expand Down Expand Up @@ -161,6 +163,7 @@ function toChangeID(changeID: ChangeID): PbChangeID {
clientSeq: changeID.getClientSeq(),
lamport: changeID.getLamport(),
actorId: toUint8Array(changeID.getActorID()),
versionVector: toVersionVector(changeID.getVersionVector()),
});
}

Expand All @@ -179,6 +182,21 @@ function toTimeTicket(ticket?: TimeTicket): PbTimeTicket | undefined {
});
}

/**
* `toVersionVector` converts the given model to Protobuf format.
*/
function toVersionVector(vector?: VersionVector): PbVersionVector | undefined {
if (!vector) {
return;
}

const pbVector = new PbVersionVector();
for (const [actorID, lamport] of vector) {
pbVector.vector[actorID] = BigInt(lamport.toString());
}
return pbVector;
}

/**
* `toValueType` converts the given model to Protobuf format.
*/
Expand Down Expand Up @@ -780,6 +798,7 @@ function toChangePack(pack: ChangePack<Indexable>): PbChangePack {
isRemoved: pack.getIsRemoved(),
changes: toChanges(pack.getChanges()),
snapshot: pack.getSnapshot(),
versionVector: toVersionVector(pack.getVersionVector()),
minSyncedTicket: toTimeTicket(pack.getMinSyncedTicket()),
});
}
Expand Down Expand Up @@ -810,10 +829,28 @@ function fromChangeID(pbChangeID: PbChangeID): ChangeID {
pbChangeID.clientSeq,
BigInt(pbChangeID.lamport),
toHexString(pbChangeID.actorId),
fromVersionVector(pbChangeID.versionVector)!,
BigInt(pbChangeID.serverSeq),
);
}

/**
* `fromVersionVector` converts the given Protobuf format to model format.
*/
function fromVersionVector(
pbVersionVector?: PbVersionVector,
): VersionVector | undefined {
if (!pbVersionVector) {
return;
}

const vector = new VersionVector();
Object.entries(pbVersionVector.vector).forEach(([key, value]) => {
vector.set(key, BigInt(value.toString()));
});
return vector;
}

/**
* `fromTimeTicket` converts the given Protobuf format to model format.
*/
Expand Down Expand Up @@ -1324,6 +1361,7 @@ function fromChangePack<P extends Indexable>(
fromCheckpoint(pbPack.checkpoint!),
pbPack.isRemoved,
fromChanges(pbPack.changes),
fromVersionVector(pbPack.versionVector),
pbPack.snapshot,
fromTimeTicket(pbPack.minSyncedTicket),
);
Expand Down Expand Up @@ -1470,6 +1508,25 @@ function bytesToSnapshot<P extends Indexable>(
};
}

/**
* `versionVectorToHex` converts the given VersionVector to bytes.
*/
function versionVectorToHex(vector: VersionVector): string {
const pbVersionVector = toVersionVector(vector)!;

return bytesToHex(pbVersionVector.toBinary());
}

/**
* `hexToVersionVector` creates a VersionVector from the given bytes.
*/
function hexToVersionVector(hex: string): VersionVector {
const bytes = hexToBytes(hex);
const pbVersionVector = PbVersionVector.fromBinary(bytes);

return fromVersionVector(pbVersionVector)!;
}

/**
* `bytesToObject` creates an JSONObject from the given byte array.
*/
Expand Down Expand Up @@ -1602,4 +1659,6 @@ export const converter = {
PbChangeID,
bytesToChangeID,
bytesToOperation,
versionVectorToHex,
hexToVersionVector,
};
6 changes: 6 additions & 0 deletions packages/sdk/src/api/yorkie/v1/resources.proto
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ message ChangePack {
repeated Change changes = 4;
TimeTicket min_synced_ticket = 5;
bool is_removed = 6;
VersionVector version_vector = 7;
}

message Change {
Expand All @@ -60,6 +61,11 @@ message ChangeID {
int64 server_seq = 2;
int64 lamport = 3;
bytes actor_id = 4;
VersionVector version_vector = 5;
}

message VersionVector {
map<string, int64> vector = 1;
}

message Operation {
Expand Down
49 changes: 49 additions & 0 deletions packages/sdk/src/api/yorkie/v1/resources_pb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,11 @@ export class ChangePack extends Message<ChangePack> {
*/
isRemoved = false;

/**
* @generated from field: yorkie.v1.VersionVector version_vector = 7;
*/
versionVector?: VersionVector;

constructor(data?: PartialMessage<ChangePack>) {
super();
proto3.util.initPartial(data, this);
Expand All @@ -243,6 +248,7 @@ export class ChangePack extends Message<ChangePack> {
{ no: 4, name: "changes", kind: "message", T: Change, repeated: true },
{ no: 5, name: "min_synced_ticket", kind: "message", T: TimeTicket },
{ no: 6, name: "is_removed", kind: "scalar", T: 8 /* ScalarType.BOOL */ },
{ no: 7, name: "version_vector", kind: "message", T: VersionVector },
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): ChangePack {
Expand Down Expand Up @@ -341,6 +347,11 @@ export class ChangeID extends Message<ChangeID> {
*/
actorId = new Uint8Array(0);

/**
* @generated from field: yorkie.v1.VersionVector version_vector = 5;
*/
versionVector?: VersionVector;

constructor(data?: PartialMessage<ChangeID>) {
super();
proto3.util.initPartial(data, this);
Expand All @@ -353,6 +364,7 @@ export class ChangeID extends Message<ChangeID> {
{ no: 2, name: "server_seq", kind: "scalar", T: 3 /* ScalarType.INT64 */ },
{ no: 3, name: "lamport", kind: "scalar", T: 3 /* ScalarType.INT64 */ },
{ no: 4, name: "actor_id", kind: "scalar", T: 12 /* ScalarType.BYTES */ },
{ no: 5, name: "version_vector", kind: "message", T: VersionVector },
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): ChangeID {
Expand All @@ -372,6 +384,43 @@ export class ChangeID extends Message<ChangeID> {
}
}

/**
* @generated from message yorkie.v1.VersionVector
*/
export class VersionVector extends Message<VersionVector> {
/**
* @generated from field: map<string, int64> vector = 1;
*/
vector: { [key: string]: bigint } = {};

constructor(data?: PartialMessage<VersionVector>) {
super();
proto3.util.initPartial(data, this);
}

static readonly runtime: typeof proto3 = proto3;
static readonly typeName = "yorkie.v1.VersionVector";
static readonly fields: FieldList = proto3.util.newFieldList(() => [
{ no: 1, name: "vector", kind: "map", K: 9 /* ScalarType.STRING */, V: {kind: "scalar", T: 3 /* ScalarType.INT64 */} },
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): VersionVector {
return new VersionVector().fromBinary(bytes, options);
}

static fromJson(jsonValue: JsonValue, options?: Partial<JsonReadOptions>): VersionVector {
return new VersionVector().fromJson(jsonValue, options);
}

static fromJsonString(jsonString: string, options?: Partial<JsonReadOptions>): VersionVector {
return new VersionVector().fromJsonString(jsonString, options);
}

static equals(a: VersionVector | PlainMessage<VersionVector> | undefined, b: VersionVector | PlainMessage<VersionVector> | undefined): boolean {
return proto3.util.equals(VersionVector, a, b);
}
}

/**
* @generated from message yorkie.v1.Operation
*/
Expand Down
93 changes: 79 additions & 14 deletions packages/sdk/src/document/change/change_id.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,35 @@ import {
InitialActorID,
} from '@yorkie-js-sdk/src/document/time/actor_id';
import { TimeTicket } from '@yorkie-js-sdk/src/document/time/ticket';
import { InitialVersionVector, VersionVector } from '../time/version_vector';

/**
* `ChangeID` is for identifying the Change. This is immutable.
*/
export class ChangeID {
// `clientSeq` is the sequence number of the client that created this change.
private clientSeq: number;

// `serverSeq` is optional and only present for changes stored on the server.
private serverSeq?: bigint;

// `lamport` and `actor` are the lamport clock and the actor of this change.
// This is used to determine the order of changes in logical time.
private lamport: bigint;
private actor: ActorID;
// `versionVector` is the vector clock of this change. This is used to
// determine the relationship is causal or not between changes.
private versionVector: VersionVector;

constructor(
clientSeq: number,
lamport: bigint,
actor: ActorID,
vector: VersionVector,
serverSeq?: bigint,
) {
this.clientSeq = clientSeq;
this.serverSeq = serverSeq;
this.lamport = lamport;
this.versionVector = vector;
this.actor = actor;
}

Expand All @@ -51,29 +58,56 @@ export class ChangeID {
clientSeq: number,
lamport: bigint,
actor: ActorID,
vector: VersionVector,
serverSeq?: bigint,
): ChangeID {
return new ChangeID(clientSeq, lamport, actor, serverSeq);
return new ChangeID(clientSeq, lamport, actor, vector, serverSeq);
}

/**
* `next` creates a next ID of this ID.
*/
public next(): ChangeID {
return new ChangeID(this.clientSeq + 1, this.lamport + 1n, this.actor);
const vector = this.versionVector.deepcopy();
vector.set(this.actor, this.lamport + 1n);

return new ChangeID(
this.clientSeq + 1,
this.lamport + 1n,
this.actor,
vector,
);
}

/**
* `syncLamport` syncs lamport timestamp with the given ID.
*
* {@link https://en.wikipedia.org/wiki/Lamport_timestamps#Algorithm}
* `syncClocks` syncs logical clocks with the given ID.
*/
public syncLamport(otherLamport: bigint): ChangeID {
if (otherLamport > this.lamport) {
return new ChangeID(this.clientSeq, otherLamport, this.actor);
}
public syncClocks(other: ChangeID): ChangeID {
const lamport =
other.lamport > this.lamport ? other.lamport + 1n : this.lamport + 1n;
const maxVersionVector = this.versionVector.max(other.versionVector);

const newID = new ChangeID(
this.clientSeq,
lamport,
this.actor,
maxVersionVector,
);
newID.versionVector.set(this.actor, lamport);
return newID;
}

return new ChangeID(this.clientSeq, this.lamport + 1n, this.actor);
/**
* `setClocks` sets the given clocks to this ID. This is used when the snapshot
* is given from the server.
*/
public setClocks(otherLamport: bigint, vector: VersionVector): ChangeID {
const lamport =
otherLamport > this.lamport ? otherLamport : this.lamport + 1n;
const maxVersionVector = this.versionVector.max(vector);
maxVersionVector.set(this.actor, lamport);

return ChangeID.of(this.clientSeq, lamport, this.actor, maxVersionVector);
}

/**
Expand All @@ -87,7 +121,26 @@ export class ChangeID {
* `setActor` sets the given actor.
*/
public setActor(actorID: ActorID): ChangeID {
return new ChangeID(this.clientSeq, this.lamport, actorID, this.serverSeq);
return new ChangeID(
this.clientSeq,
this.lamport,
actorID,
this.versionVector,
this.serverSeq,
);
}

/**
* `setVersionVector` sets the given version vector.
*/
public setVersionVector(versionVector: VersionVector): ChangeID {
return new ChangeID(
this.clientSeq,
this.lamport,
this.actor,
versionVector,
this.serverSeq,
);
}

/**
Expand Down Expand Up @@ -128,6 +181,13 @@ export class ChangeID {
return this.actor;
}

/**
* `getVersionVector` returns the version vector of this ID.
*/
public getVersionVector(): VersionVector {
return this.versionVector;
}

/**
* `toTestString` returns a string containing the meta data of this ID.
*/
Expand All @@ -142,4 +202,9 @@ export class ChangeID {
* `InitialChangeID` represents the initial state ID. Usually this is used to
* represent a state where nothing has been edited.
*/
export const InitialChangeID = new ChangeID(0, 0n, InitialActorID);
export const InitialChangeID = new ChangeID(
0,
0n,
InitialActorID,
InitialVersionVector,
);
Loading

0 comments on commit 2124689

Please sign in to comment.