Skip to content

Commit

Permalink
feat: add function processing logs from g2
Browse files Browse the repository at this point in the history
Signed-off-by: Yogesh01000100 <[email protected]>
  • Loading branch information
Yogesh01000100 committed Nov 15, 2024
1 parent eaac8fd commit 5ce06a8
Show file tree
Hide file tree
Showing 11 changed files with 138 additions and 71 deletions.
2 changes: 1 addition & 1 deletion packages/cactus-plugin-satp-hermes/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -208,4 +208,4 @@
"runOnChangeOnly": true
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,12 @@ message RollbackAckMessage {
}

message LocalLog {
string key=1;
string sessionId=2;
string data=3;
string type=4;
string operation=5;
string timestamp=6;
string sessionID = 1;
string type = 2;
string key = 3;
string operation = 4;
string timestamp = 5;
string data = 6;
}

message RollbackLogEntry {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,14 +292,49 @@ export class CrashRecoveryManager {
}
}

private processRecoverUpdate(
private async processRecoverUpdate(
message: RecoverUpdateMessage,
): Promise<boolean> {
this.log.debug("Message received: ", message.messageType);
// get the logs from counterparty gateway and sync with current session
this.log.debug(`Session processed & updated with RecoverUpdateMessage`);
const fnTag = `${this.className}#processRecoverUpdate()`;
try {
const sessionId = message.sessionId;
const recoveredLogs = message.recoveredLogs;

if (!recoveredLogs || recoveredLogs.length === 0) {
this.log.warn(`${fnTag} No recovered logs to process.`);
return true;
}

for (const logEntry of recoveredLogs) {
await this.logRepository.create(logEntry);
}

const allLogs = await this.logRepository.readLogsBySessionId(sessionId);

if (!allLogs || allLogs.length === 0) {
this.log.error(`${fnTag} No logs found for session ID: ${sessionId}`);
return false;
}

let reconstructedSessionData = new SessionData();

return Promise.resolve(true);
for (const logEntry of allLogs) {
const data = JSON.parse(logEntry.data);
reconstructedSessionData = data;
this.sessions.set(sessionId, reconstructedSessionData);
}

this.log.info(
`${fnTag} Session data successfully reconstructed for session ID: ${sessionId}`,
);

return true;
} catch (error) {
this.log.error(
`${fnTag} Error processing RecoverUpdateMessage: ${error}`,
);
return false;
}
}

public async initiateRollback(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ export class CrashRecoveryService {
request: RecoverMessage,
): Promise<RecoverUpdateMessage> {
this.log.debug("Creating RecoverUpdateMessage...");
const recoveredLogs =
await this.logRepository.readLogsMoreRecentThanTimestamp(
request.lastEntryTimestamp.toString(),
);
const recoveredLogs = await this.logRepository.fetchLogsFromSequence(
request.sessionId,
);
return new RecoverUpdateMessage({
sessionId: request.sessionId,
messageType: "urn:ietf:SATP-2pc:msgtype:recover-update-msg",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,6 @@ import {
import { RollbackState } from "../../../generated/proto/cacti/satp/v02/crash_recovery_pb";
import { ILocalLogRepository } from "../../../repository/interfaces/repository";

/*export interface RollbackState {
currentStage: string;
// todo add rollback state
// placeholder, should import RollbackLogEntry from protos.
// RollbackLogEntry in spec = RollbackState in code
}*/

export interface RollbackStrategy {
execute(session: SATPSession): Promise<RollbackState>;
// todo do we want to return any information?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,34 +352,34 @@ export class RollbackAckMessage extends Message<RollbackAckMessage> {
*/
export class LocalLog extends Message<LocalLog> {
/**
* @generated from field: string key = 1;
* @generated from field: string sessionID = 1;
*/
key = "";
sessionID = "";

/**
* @generated from field: string sessionId = 2;
* @generated from field: string type = 2;
*/
sessionId = "";
type = "";

/**
* @generated from field: string data = 3;
* @generated from field: string key = 3;
*/
data = "";
key = "";

/**
* @generated from field: string type = 4;
* @generated from field: string operation = 4;
*/
type = "";
operation = "";

/**
* @generated from field: string operation = 5;
* @generated from field: string timestamp = 5;
*/
operation = "";
timestamp = "";

/**
* @generated from field: string timestamp = 6;
* @generated from field: string data = 6;
*/
timestamp = "";
data = "";

constructor(data?: PartialMessage<LocalLog>) {
super();
Expand All @@ -389,12 +389,12 @@ export class LocalLog extends Message<LocalLog> {
static readonly runtime: typeof proto3 = proto3;
static readonly typeName = "cacti.satp.v02.crash.LocalLog";
static readonly fields: FieldList = proto3.util.newFieldList(() => [
{ no: 1, name: "key", kind: "scalar", T: 9 /* ScalarType.STRING */ },
{ no: 2, name: "sessionId", kind: "scalar", T: 9 /* ScalarType.STRING */ },
{ no: 3, name: "data", kind: "scalar", T: 9 /* ScalarType.STRING */ },
{ no: 4, name: "type", kind: "scalar", T: 9 /* ScalarType.STRING */ },
{ no: 5, name: "operation", kind: "scalar", T: 9 /* ScalarType.STRING */ },
{ no: 6, name: "timestamp", kind: "scalar", T: 9 /* ScalarType.STRING */ },
{ no: 1, name: "sessionID", kind: "scalar", T: 9 /* ScalarType.STRING */ },
{ no: 2, name: "type", kind: "scalar", T: 9 /* ScalarType.STRING */ },
{ no: 3, name: "key", kind: "scalar", T: 9 /* ScalarType.STRING */ },
{ no: 4, name: "operation", kind: "scalar", T: 9 /* ScalarType.STRING */ },
{ no: 5, name: "timestamp", kind: "scalar", T: 9 /* ScalarType.STRING */ },
{ no: 6, name: "data", kind: "scalar", T: 9 /* ScalarType.STRING */ },
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): LocalLog {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ export interface ILocalLogRepository extends IRepository<LocalLog, string> {
readLastestLog(sessionID: string): Promise<LocalLog>;
create(log: LocalLog): Promise<LocalLog>;
deleteBySessionId(log: string): any;
fetchLogsFromSequence(id: string): Promise<LocalLog[]>;
readLogsBySessionId(sessionId: string): Promise<LocalLog[]>;
destroy(): any;
reset(): any;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,16 @@ export class KnexLocalLogRepository implements ILocalLogRepository {
.groupBy("sessionID");
}

fetchLogsFromSequence(sessionId: string): Promise<LocalLog[]> {
return this.getLogsTable().where("sessionID", sessionId);
}

readLogsBySessionId(sessionId: string): Promise<LocalLog[]> {
return this.getLogsTable()
.where({ sessionID: sessionId })
.orderBy("timestamp", "asc");
}

async reset() {
await this.database.migrate.rollback();
await this.database.migrate.latest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { SATPSession } from "../../../../main/typescript/core/satp-session";
import { knexClientConnection } from "../../knex.config";
import { getSatpLogKey } from "../../../../main/typescript/gateway-utils";
import { TokenType } from "../../../../main/typescript/core/stage-services/satp-bridge/types/asset";
import { RecoverUpdateMessage } from "../../../../main/typescript/generated/proto/cacti/satp/v02/crash_recovery_pb";

const logLevel: LogLevelDesc = "DEBUG";

Expand Down Expand Up @@ -86,6 +87,7 @@ const createMockSession = (maxTimeout: string, maxRetries: string) => {
sessionData.receiverAsset.contractName = "MOCK_RECEIVER_ASSET_CONTRACT_NAME";
sessionData.receiverAsset.mspId = "MOCK_RECEIVER_ASSET_MSP_ID";
sessionData.receiverAsset.channelName = "MOCK_CHANNEL_ID";
sessionData.lastSequenceNumber = BigInt(4);

return mockSession;
};
Expand Down Expand Up @@ -337,4 +339,38 @@ describe("CrashRecoveryManager Tests", () => {
handleRecoverySpy.mockRestore();
handleInitiateRollBackSpy.mockRestore();
});

it("should process recovered logs and reconstruct SessionData", async () => {
const mockSession = createMockSession("1000", "3");
const testData = mockSession.hasClientSessionData()
? mockSession.getClientSessionData()
: mockSession.getServerSessionData();

const recoveredLogs: LocalLog[] = [
{
sessionID: sessionId,
type: "type_1",
key: getSatpLogKey(sessionId, "type_1", "init"),
operation: "init",
timestamp: new Date().toISOString(),
data: JSON.stringify(testData),
},
];

const recoverUpdateMessage = {
sessionId: sessionId,
messageType: "urn:ietf:SATP-2pc:msgtype:recover-update-msg",
hashRecoverMessage: "",
recoveredLogs: recoveredLogs,
senderSignature: "",
} as RecoverUpdateMessage;

const result =
await crashManager["processRecoverUpdate"](recoverUpdateMessage);

expect(result).toBeTrue();

const reconstructedSessionData = crashManager["sessions"].get(sessionId);
expect(reconstructedSessionData).toBeDefined();
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ beforeAll(async () => {
});

describe("Crash Recovery Services Testing", () => {
it("handle reover function test", async () => {
it("handle recover function test", async () => {
mockSession = createMockSession();

const testData = mockSession.hasClientSessionData()
Expand Down
50 changes: 21 additions & 29 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -9142,9 +9142,8 @@ __metadata:
make-dir-cli: "npm:3.1.0"
npm-run-all: "npm:4.1.5"
openzeppelin-solidity: "npm:3.4.2"
pg: "npm:^8.8.0"
pg: "npm:^8.13.0"
protobufjs: "npm:7.2.5"
safe-stable-stringify: "npm:2.5.0"
secp256k1: "npm:4.0.3"
socket.io: "npm:4.6.2"
sqlite3: "npm:5.1.5"
Expand Down Expand Up @@ -40544,14 +40543,7 @@ __metadata:
languageName: node
linkType: hard

"pg-connection-string@npm:^2.6.4":
version: 2.6.4
resolution: "pg-connection-string@npm:2.6.4"
checksum: 10/2c1d2ac1add1f93076f1594d217a0980f79add05dc48de6363e1c550827c78a6ee3e3b5420da9c54858f6b678cdb348aed49732ee68158b6cdb70f1d1c748cf9
languageName: node
linkType: hard

"pg-connection-string@npm:^2.5.0":
"pg-connection-string@npm:^2.7.0":
version: 2.7.0
resolution: "pg-connection-string@npm:2.7.0"
checksum: 10/68015a8874b7ca5dad456445e4114af3d2602bac2fdb8069315ecad0ff9660ec93259b9af7186606529ac4f6f72a06831e6f20897a689b16cc7fda7ca0e247fd
Expand All @@ -40574,12 +40566,12 @@ __metadata:
languageName: node
linkType: hard

"pg-pool@npm:^3.6.2":
version: 3.6.2
resolution: "pg-pool@npm:3.6.2"
"pg-pool@npm:^3.7.0":
version: 3.7.0
resolution: "pg-pool@npm:3.7.0"
peerDependencies:
pg: ">=8.0"
checksum: 10/d5ccefb9a4913c737e07106ada841c7d8f2b110b02ef6b4cee198e1e7e758bac43cb3b6df7646e25858b9fe300db00f2f349868296fbd4b3b4c99c15906d1596
checksum: 10/a07a4f9e26eec9d7ac3597dc7b3469c62983edff9a321dbb7acbe1bbc7f5e9b2d33438e277d4cf8145071f3d63c7ebdc287a539fd69dfb8cdddb15b33eefe1a2
languageName: node
linkType: hard

Expand All @@ -40590,10 +40582,10 @@ __metadata:
languageName: node
linkType: hard

"pg-protocol@npm:^1.6.1":
version: 1.6.1
resolution: "pg-protocol@npm:1.6.1"
checksum: 10/9af672208adae8214f55f5b4597c4699ab9946205a99863d3e2bb8d024fdab16711457b539bc366cc29040218aa87508cf61294b76d288f48881b973d9117bd6
"pg-protocol@npm:^1.7.0":
version: 1.7.0
resolution: "pg-protocol@npm:1.7.0"
checksum: 10/ffffdf74426c9357b57050f1c191e84447c0e8b2a701b3ab302ac7dd0eb27b862d92e5e3b2d38876a1051de83547eb9165d6a58b3a8e90bb050dae97f9993d54
languageName: node
linkType: hard

Expand Down Expand Up @@ -40630,14 +40622,14 @@ __metadata:
languageName: node
linkType: hard

"pg@npm:^8.8.0":
version: 8.12.0
resolution: "pg@npm:8.12.0"
"pg@npm:^8.13.0":
version: 8.13.1
resolution: "pg@npm:8.13.1"
dependencies:
pg-cloudflare: "npm:^1.1.1"
pg-connection-string: "npm:^2.6.4"
pg-pool: "npm:^3.6.2"
pg-protocol: "npm:^1.6.1"
pg-connection-string: "npm:^2.7.0"
pg-pool: "npm:^3.7.0"
pg-protocol: "npm:^1.7.0"
pg-types: "npm:^2.1.0"
pgpass: "npm:1.x"
peerDependencies:
Expand All @@ -40648,7 +40640,7 @@ __metadata:
peerDependenciesMeta:
pg-native:
optional: true
checksum: 10/ce39af0e85d42bf5fc8dcc02c57b38d4cb203fea937688509a77c0b005a54d4821e5e5963a5663934d76994eab42381698f08a44e21544b4545fd9d142dcfd12
checksum: 10/542aa49fcb37657cf5f779b4a31fe6eb336e683445ecca38e267eeb0ca85d873ffe51f04794f9f9e184187e9f74bf7895e932a0fa9507132ac0dfc76c7c73451
languageName: node
linkType: hard

Expand Down Expand Up @@ -44861,10 +44853,10 @@ __metadata:
languageName: node
linkType: hard

"safe-stable-stringify@npm:2.5.0, safe-stable-stringify@npm:^2.3.1, safe-stable-stringify@npm:^2.4.3":
version: 2.5.0
resolution: "safe-stable-stringify@npm:2.5.0"
checksum: 10/2697fa186c17c38c3ca5309637b4ac6de2f1c3d282da27cd5e1e3c88eca0fb1f9aea568a6aabdf284111592c8782b94ee07176f17126031be72ab1313ed46c5c
"safe-stable-stringify@npm:^2.3.1":
version: 2.3.1
resolution: "safe-stable-stringify@npm:2.3.1"
checksum: 10/8a6ed4e5fb80694970f1939538518c44a59c71c74305e12b5964cbe3850636212eddac881da1f676b0232015213676e07750fe75bc402afbfe29851c8b52381e
languageName: node
linkType: hard

Expand Down

0 comments on commit 5ce06a8

Please sign in to comment.