From 5ce06a8dd85b3085fa16001dc1fc70642a165c82 Mon Sep 17 00:00:00 2001 From: Yogesh01000100 Date: Sun, 3 Nov 2024 19:08:30 +0000 Subject: [PATCH] feat: add function processing logs from g2 Signed-off-by: Yogesh01000100 --- .../cactus-plugin-satp-hermes/package.json | 2 +- .../proto/cacti/satp/v02/crash_recovery.proto | 12 ++--- .../typescript/core/recovery/crash-manager.ts | 45 +++++++++++++++-- .../typescript/core/recovery/crash-utils.ts | 7 ++- .../rollback/rollback-strategy-factory.ts | 7 --- .../proto/cacti/satp/v02/crash_recovery_pb.ts | 36 ++++++------- .../repository/interfaces/repository.ts | 2 + .../repository/knex-local-log-repository.ts | 10 ++++ .../typescript/unit/recovery/logging.test.ts | 36 +++++++++++++ .../typescript/unit/recovery/services.test.ts | 2 +- yarn.lock | 50 ++++++++----------- 11 files changed, 138 insertions(+), 71 deletions(-) diff --git a/packages/cactus-plugin-satp-hermes/package.json b/packages/cactus-plugin-satp-hermes/package.json index b1057964fe..b07941a044 100644 --- a/packages/cactus-plugin-satp-hermes/package.json +++ b/packages/cactus-plugin-satp-hermes/package.json @@ -208,4 +208,4 @@ "runOnChangeOnly": true } } -} \ No newline at end of file +} diff --git a/packages/cactus-plugin-satp-hermes/src/main/proto/cacti/satp/v02/crash_recovery.proto b/packages/cactus-plugin-satp-hermes/src/main/proto/cacti/satp/v02/crash_recovery.proto index d94d083b05..9447da1491 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/proto/cacti/satp/v02/crash_recovery.proto +++ b/packages/cactus-plugin-satp-hermes/src/main/proto/cacti/satp/v02/crash_recovery.proto @@ -63,12 +63,12 @@ message RollbackAckMessage { } message LocalLog { - string key=1; - string sessionId=2; - string data=3; - string type=4; - string operation=5; - string timestamp=6; + string sessionID = 1; + string type = 2; + string key = 3; + string operation = 4; + string timestamp = 5; + string data = 6; } message RollbackLogEntry { diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-manager.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-manager.ts index 93c7ac2d52..ffb51baff0 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-manager.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-manager.ts @@ -292,14 +292,49 @@ export class CrashRecoveryManager { } } - private processRecoverUpdate( + private async processRecoverUpdate( message: RecoverUpdateMessage, ): Promise { - this.log.debug("Message received: ", message.messageType); - // get the logs from counterparty gateway and sync with current session - this.log.debug(`Session processed & updated with RecoverUpdateMessage`); + const fnTag = `${this.className}#processRecoverUpdate()`; + try { + const sessionId = message.sessionId; + const recoveredLogs = message.recoveredLogs; + + if (!recoveredLogs || recoveredLogs.length === 0) { + this.log.warn(`${fnTag} No recovered logs to process.`); + return true; + } + + for (const logEntry of recoveredLogs) { + await this.logRepository.create(logEntry); + } + + const allLogs = await this.logRepository.readLogsBySessionId(sessionId); + + if (!allLogs || allLogs.length === 0) { + this.log.error(`${fnTag} No logs found for session ID: ${sessionId}`); + return false; + } + + let reconstructedSessionData = new SessionData(); - return Promise.resolve(true); + for (const logEntry of allLogs) { + const data = JSON.parse(logEntry.data); + reconstructedSessionData = data; + this.sessions.set(sessionId, reconstructedSessionData); + } + + this.log.info( + `${fnTag} Session data successfully reconstructed for session ID: ${sessionId}`, + ); + + return true; + } catch (error) { + this.log.error( + `${fnTag} Error processing RecoverUpdateMessage: ${error}`, + ); + return false; + } } public async initiateRollback( diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-utils.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-utils.ts index 77982d857a..95faa5c18e 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-utils.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-utils.ts @@ -29,10 +29,9 @@ export class CrashRecoveryService { request: RecoverMessage, ): Promise { this.log.debug("Creating RecoverUpdateMessage..."); - const recoveredLogs = - await this.logRepository.readLogsMoreRecentThanTimestamp( - request.lastEntryTimestamp.toString(), - ); + const recoveredLogs = await this.logRepository.fetchLogsFromSequence( + request.sessionId, + ); return new RecoverUpdateMessage({ sessionId: request.sessionId, messageType: "urn:ietf:SATP-2pc:msgtype:recover-update-msg", diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/rollback-strategy-factory.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/rollback-strategy-factory.ts index 54f432d8d3..3b187cf212 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/rollback-strategy-factory.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/rollback-strategy-factory.ts @@ -11,13 +11,6 @@ import { import { RollbackState } from "../../../generated/proto/cacti/satp/v02/crash_recovery_pb"; import { ILocalLogRepository } from "../../../repository/interfaces/repository"; -/*export interface RollbackState { - currentStage: string; - // todo add rollback state - // placeholder, should import RollbackLogEntry from protos. - // RollbackLogEntry in spec = RollbackState in code -}*/ - export interface RollbackStrategy { execute(session: SATPSession): Promise; // todo do we want to return any information? diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/crash_recovery_pb.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/crash_recovery_pb.ts index b4659aa2e8..92fae720f0 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/crash_recovery_pb.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/crash_recovery_pb.ts @@ -352,34 +352,34 @@ export class RollbackAckMessage extends Message { */ export class LocalLog extends Message { /** - * @generated from field: string key = 1; + * @generated from field: string sessionID = 1; */ - key = ""; + sessionID = ""; /** - * @generated from field: string sessionId = 2; + * @generated from field: string type = 2; */ - sessionId = ""; + type = ""; /** - * @generated from field: string data = 3; + * @generated from field: string key = 3; */ - data = ""; + key = ""; /** - * @generated from field: string type = 4; + * @generated from field: string operation = 4; */ - type = ""; + operation = ""; /** - * @generated from field: string operation = 5; + * @generated from field: string timestamp = 5; */ - operation = ""; + timestamp = ""; /** - * @generated from field: string timestamp = 6; + * @generated from field: string data = 6; */ - timestamp = ""; + data = ""; constructor(data?: PartialMessage) { super(); @@ -389,12 +389,12 @@ export class LocalLog extends Message { static readonly runtime: typeof proto3 = proto3; static readonly typeName = "cacti.satp.v02.crash.LocalLog"; static readonly fields: FieldList = proto3.util.newFieldList(() => [ - { no: 1, name: "key", kind: "scalar", T: 9 /* ScalarType.STRING */ }, - { no: 2, name: "sessionId", kind: "scalar", T: 9 /* ScalarType.STRING */ }, - { no: 3, name: "data", kind: "scalar", T: 9 /* ScalarType.STRING */ }, - { no: 4, name: "type", kind: "scalar", T: 9 /* ScalarType.STRING */ }, - { no: 5, name: "operation", kind: "scalar", T: 9 /* ScalarType.STRING */ }, - { no: 6, name: "timestamp", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + { no: 1, name: "sessionID", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + { no: 2, name: "type", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + { no: 3, name: "key", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + { no: 4, name: "operation", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + { no: 5, name: "timestamp", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + { no: 6, name: "data", kind: "scalar", T: 9 /* ScalarType.STRING */ }, ]); static fromBinary(bytes: Uint8Array, options?: Partial): LocalLog { diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/repository/interfaces/repository.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/repository/interfaces/repository.ts index 8d1524398f..22c41fb4cf 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/repository/interfaces/repository.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/repository/interfaces/repository.ts @@ -15,6 +15,8 @@ export interface ILocalLogRepository extends IRepository { readLastestLog(sessionID: string): Promise; create(log: LocalLog): Promise; deleteBySessionId(log: string): any; + fetchLogsFromSequence(id: string): Promise; + readLogsBySessionId(sessionId: string): Promise; destroy(): any; reset(): any; } diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/repository/knex-local-log-repository.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/repository/knex-local-log-repository.ts index e5a71a4d9c..c9c2c90977 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/repository/knex-local-log-repository.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/repository/knex-local-log-repository.ts @@ -54,6 +54,16 @@ export class KnexLocalLogRepository implements ILocalLogRepository { .groupBy("sessionID"); } + fetchLogsFromSequence(sessionId: string): Promise { + return this.getLogsTable().where("sessionID", sessionId); + } + + readLogsBySessionId(sessionId: string): Promise { + return this.getLogsTable() + .where({ sessionID: sessionId }) + .orderBy("timestamp", "asc"); + } + async reset() { await this.database.migrate.rollback(); await this.database.migrate.latest(); diff --git a/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/logging.test.ts b/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/logging.test.ts index a7f643a1df..8aa6ba3902 100644 --- a/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/logging.test.ts +++ b/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/logging.test.ts @@ -23,6 +23,7 @@ import { SATPSession } from "../../../../main/typescript/core/satp-session"; import { knexClientConnection } from "../../knex.config"; import { getSatpLogKey } from "../../../../main/typescript/gateway-utils"; import { TokenType } from "../../../../main/typescript/core/stage-services/satp-bridge/types/asset"; +import { RecoverUpdateMessage } from "../../../../main/typescript/generated/proto/cacti/satp/v02/crash_recovery_pb"; const logLevel: LogLevelDesc = "DEBUG"; @@ -86,6 +87,7 @@ const createMockSession = (maxTimeout: string, maxRetries: string) => { sessionData.receiverAsset.contractName = "MOCK_RECEIVER_ASSET_CONTRACT_NAME"; sessionData.receiverAsset.mspId = "MOCK_RECEIVER_ASSET_MSP_ID"; sessionData.receiverAsset.channelName = "MOCK_CHANNEL_ID"; + sessionData.lastSequenceNumber = BigInt(4); return mockSession; }; @@ -337,4 +339,38 @@ describe("CrashRecoveryManager Tests", () => { handleRecoverySpy.mockRestore(); handleInitiateRollBackSpy.mockRestore(); }); + + it("should process recovered logs and reconstruct SessionData", async () => { + const mockSession = createMockSession("1000", "3"); + const testData = mockSession.hasClientSessionData() + ? mockSession.getClientSessionData() + : mockSession.getServerSessionData(); + + const recoveredLogs: LocalLog[] = [ + { + sessionID: sessionId, + type: "type_1", + key: getSatpLogKey(sessionId, "type_1", "init"), + operation: "init", + timestamp: new Date().toISOString(), + data: JSON.stringify(testData), + }, + ]; + + const recoverUpdateMessage = { + sessionId: sessionId, + messageType: "urn:ietf:SATP-2pc:msgtype:recover-update-msg", + hashRecoverMessage: "", + recoveredLogs: recoveredLogs, + senderSignature: "", + } as RecoverUpdateMessage; + + const result = + await crashManager["processRecoverUpdate"](recoverUpdateMessage); + + expect(result).toBeTrue(); + + const reconstructedSessionData = crashManager["sessions"].get(sessionId); + expect(reconstructedSessionData).toBeDefined(); + }); }); diff --git a/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/services.test.ts b/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/services.test.ts index 706d87a74d..e121e7a702 100644 --- a/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/services.test.ts +++ b/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/services.test.ts @@ -101,7 +101,7 @@ beforeAll(async () => { }); describe("Crash Recovery Services Testing", () => { - it("handle reover function test", async () => { + it("handle recover function test", async () => { mockSession = createMockSession(); const testData = mockSession.hasClientSessionData() diff --git a/yarn.lock b/yarn.lock index 160c1bbe5f..19f3e80e6a 100644 --- a/yarn.lock +++ b/yarn.lock @@ -9142,9 +9142,8 @@ __metadata: make-dir-cli: "npm:3.1.0" npm-run-all: "npm:4.1.5" openzeppelin-solidity: "npm:3.4.2" - pg: "npm:^8.8.0" + pg: "npm:^8.13.0" protobufjs: "npm:7.2.5" - safe-stable-stringify: "npm:2.5.0" secp256k1: "npm:4.0.3" socket.io: "npm:4.6.2" sqlite3: "npm:5.1.5" @@ -40544,14 +40543,7 @@ __metadata: languageName: node linkType: hard -"pg-connection-string@npm:^2.6.4": - version: 2.6.4 - resolution: "pg-connection-string@npm:2.6.4" - checksum: 10/2c1d2ac1add1f93076f1594d217a0980f79add05dc48de6363e1c550827c78a6ee3e3b5420da9c54858f6b678cdb348aed49732ee68158b6cdb70f1d1c748cf9 - languageName: node - linkType: hard - -"pg-connection-string@npm:^2.5.0": +"pg-connection-string@npm:^2.7.0": version: 2.7.0 resolution: "pg-connection-string@npm:2.7.0" checksum: 10/68015a8874b7ca5dad456445e4114af3d2602bac2fdb8069315ecad0ff9660ec93259b9af7186606529ac4f6f72a06831e6f20897a689b16cc7fda7ca0e247fd @@ -40574,12 +40566,12 @@ __metadata: languageName: node linkType: hard -"pg-pool@npm:^3.6.2": - version: 3.6.2 - resolution: "pg-pool@npm:3.6.2" +"pg-pool@npm:^3.7.0": + version: 3.7.0 + resolution: "pg-pool@npm:3.7.0" peerDependencies: pg: ">=8.0" - checksum: 10/d5ccefb9a4913c737e07106ada841c7d8f2b110b02ef6b4cee198e1e7e758bac43cb3b6df7646e25858b9fe300db00f2f349868296fbd4b3b4c99c15906d1596 + checksum: 10/a07a4f9e26eec9d7ac3597dc7b3469c62983edff9a321dbb7acbe1bbc7f5e9b2d33438e277d4cf8145071f3d63c7ebdc287a539fd69dfb8cdddb15b33eefe1a2 languageName: node linkType: hard @@ -40590,10 +40582,10 @@ __metadata: languageName: node linkType: hard -"pg-protocol@npm:^1.6.1": - version: 1.6.1 - resolution: "pg-protocol@npm:1.6.1" - checksum: 10/9af672208adae8214f55f5b4597c4699ab9946205a99863d3e2bb8d024fdab16711457b539bc366cc29040218aa87508cf61294b76d288f48881b973d9117bd6 +"pg-protocol@npm:^1.7.0": + version: 1.7.0 + resolution: "pg-protocol@npm:1.7.0" + checksum: 10/ffffdf74426c9357b57050f1c191e84447c0e8b2a701b3ab302ac7dd0eb27b862d92e5e3b2d38876a1051de83547eb9165d6a58b3a8e90bb050dae97f9993d54 languageName: node linkType: hard @@ -40630,14 +40622,14 @@ __metadata: languageName: node linkType: hard -"pg@npm:^8.8.0": - version: 8.12.0 - resolution: "pg@npm:8.12.0" +"pg@npm:^8.13.0": + version: 8.13.1 + resolution: "pg@npm:8.13.1" dependencies: pg-cloudflare: "npm:^1.1.1" - pg-connection-string: "npm:^2.6.4" - pg-pool: "npm:^3.6.2" - pg-protocol: "npm:^1.6.1" + pg-connection-string: "npm:^2.7.0" + pg-pool: "npm:^3.7.0" + pg-protocol: "npm:^1.7.0" pg-types: "npm:^2.1.0" pgpass: "npm:1.x" peerDependencies: @@ -40648,7 +40640,7 @@ __metadata: peerDependenciesMeta: pg-native: optional: true - checksum: 10/ce39af0e85d42bf5fc8dcc02c57b38d4cb203fea937688509a77c0b005a54d4821e5e5963a5663934d76994eab42381698f08a44e21544b4545fd9d142dcfd12 + checksum: 10/542aa49fcb37657cf5f779b4a31fe6eb336e683445ecca38e267eeb0ca85d873ffe51f04794f9f9e184187e9f74bf7895e932a0fa9507132ac0dfc76c7c73451 languageName: node linkType: hard @@ -44861,10 +44853,10 @@ __metadata: languageName: node linkType: hard -"safe-stable-stringify@npm:2.5.0, safe-stable-stringify@npm:^2.3.1, safe-stable-stringify@npm:^2.4.3": - version: 2.5.0 - resolution: "safe-stable-stringify@npm:2.5.0" - checksum: 10/2697fa186c17c38c3ca5309637b4ac6de2f1c3d282da27cd5e1e3c88eca0fb1f9aea568a6aabdf284111592c8782b94ee07176f17126031be72ab1313ed46c5c +"safe-stable-stringify@npm:^2.3.1": + version: 2.3.1 + resolution: "safe-stable-stringify@npm:2.3.1" + checksum: 10/8a6ed4e5fb80694970f1939538518c44a59c71c74305e12b5964cbe3850636212eddac881da1f676b0232015213676e07750fe75bc402afbfe29851c8b52381e languageName: node linkType: hard