From cb24d53a31aeb84573943c6a27e9412e9f61f1c9 Mon Sep 17 00:00:00 2001 From: Yogesh01000100 Date: Fri, 15 Nov 2024 11:38:56 +0000 Subject: [PATCH] feat: add cron schedule for periodic crash checks Signed-off-by: Yogesh01000100 --- .../cactus-plugin-satp-hermes/package.json | 3 + .../typescript/core/recovery/crash-manager.ts | 139 ++++++++++------ .../core/recovery/crash-recovery-handler.ts | 7 +- .../src/main/typescript/core/satp-session.ts | 23 +++ .../typescript/unit/recovery/cron.test.ts | 156 ++++++++++++++++++ .../typescript/unit/recovery/logging.test.ts | 6 +- yarn.lock | 26 +++ 7 files changed, 302 insertions(+), 58 deletions(-) create mode 100644 packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/cron.test.ts diff --git a/packages/cactus-plugin-satp-hermes/package.json b/packages/cactus-plugin-satp-hermes/package.json index b07941a044b..214ae92c2b2 100644 --- a/packages/cactus-plugin-satp-hermes/package.json +++ b/packages/cactus-plugin-satp-hermes/package.json @@ -132,9 +132,11 @@ "jsonc": "2.0.0", "knex": "2.4.0", "kubo-rpc-client": "3.0.1", + "node-cron": "3.0.2", "npm-run-all": "4.1.5", "openzeppelin-solidity": "3.4.2", "pg": "^8.13.0", + "safe-stable-stringify": "2.5.0", "secp256k1": "4.0.3", "socket.io": "4.6.2", "sqlite3": "5.1.5", @@ -157,6 +159,7 @@ "@types/fs-extra": "11.0.4", "@types/google-protobuf": "3.15.5", "@types/node": "18.18.2", + "@types/node-cron": "3.0.11", "@types/pg": "8.6.5", "@types/swagger-ui-express": "4.1.6", "@types/tape": "4.13.4", diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-manager.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-manager.ts index ffb51baff02..617e885b826 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-manager.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-manager.ts @@ -23,6 +23,7 @@ import { } from "../../generated/proto/cacti/satp/v02/crash_recovery_pb"; import { SessionType } from "../session-utils"; import { ISATPBridgesOptions } from "../../gol/satp-bridges-manager"; +import cron from "node-cron"; export enum CrashStatus { IN_RECOVERY = "IN_RECOVERY", @@ -50,7 +51,7 @@ export class CrashRecoveryManager { public static readonly CLASS_NAME = "CrashRecoveryManager"; private readonly log: Logger; private readonly instanceId: string; - private sessions: Map; + private sessions: Map; private crashRecoveryHandler: CrashRecoveryHandler; private factory: RollbackStrategyFactory; private logRepository: ILocalLogRepository; @@ -63,7 +64,7 @@ export class CrashRecoveryManager { const label = this.className; this.log = LoggerProvider.getOrCreate({ level, label }); this.instanceId = options.instanceId; - this.sessions = new Map(); + this.sessions = new Map(); this.log.info(`Instantiated ${this.className} OK`); this.logRepository = new LocalLogRepository(options.knexConfig); this.factory = new RollbackStrategyFactory( @@ -103,67 +104,39 @@ export class CrashRecoveryManager { const sessionId = log.sessionID; this.log.info(`${fnTag}, recovering session: ${sessionId}`); - if (log == undefined || log.data == undefined) { - throw new Error(`${fnTag}, invalid log}`); + if (!log || !log.data) { + throw new Error(`${fnTag}, invalid log`); } try { - const logEntry: SessionData = JSON.parse(log.data); - this.sessions.set(sessionId, logEntry); + const sessionData: SessionData = JSON.parse(log.data); + const satpSession = SATPSession.fromSessionData(sessionData); + this.sessions.set(sessionId, satpSession); } catch (error) { this.log.error( `Error parsing log data for session Id: ${sessionId}: ${error}`, ); } } + this.detectCrash(); } catch (error) { this.log.error(`Error initializing sessions: ${error}`); } } - private async checkCrash(session: SATPSession): Promise { - const fnTag = `${this.className}#checkCrash()`; - const sessionData = session.hasClientSessionData() - ? session.getClientSessionData() - : session.getServerSessionData(); - - try { - session.verify( - fnTag, - session.hasClientSessionData() - ? SessionType.CLIENT - : SessionType.SERVER, - ); - - const lastLog = await this.logRepository.readLastestLog( - session.getSessionId(), - ); - - if (lastLog && lastLog.operation !== "done") { - this.log.debug( - `${fnTag} Crash detected for session ID: ${session.getSessionId()} last log operation: ${lastLog.operation}`, - ); - return CrashStatus.IN_RECOVERY; - } - - const logTimestamp = new Date(lastLog?.timestamp ?? 0).getTime(); - const currentTime = new Date().getTime(); - const timeDifference = currentTime - logTimestamp; - - if (timeDifference > Number(sessionData.maxTimeout)) { - this.log.warn( - `${fnTag} Timeout exceeded by ${timeDifference} ms for session ID: ${session.getSessionId()}`, - ); - return CrashStatus.IN_RECOVERY; - } + private detectCrash() { + const fnTag = `${this.className}#startCrashDetectionCron()`; + cron.schedule("*/10 * * * * *", async () => { + this.log.debug(`${fnTag} Running crash detection cron job.`); + // helper function + await this.checkAndResolveCrashes(); + }); + this.log.info(`${fnTag} Crash detection cron job scheduled.`); + } - this.log.info( - `${fnTag} No crash detected for session ID: ${session.getSessionId()}`, - ); - return CrashStatus.NO_CRASH; - } catch (error) { - this.log.error(`${fnTag} Error occured !`); - return CrashStatus.ERROR; + public async checkAndResolveCrashes(): Promise { + for (const session of this.sessions.values()) { + await this.checkAndResolveCrash(session); } } @@ -212,16 +185,16 @@ export class CrashRecoveryManager { `${fnTag} Retry attempt ${attempts} for sessionID: ${session.getSessionId()}`, ); } - if (attempts != 0) { - this.log.warn(`${fnTag} All retries exhausted ! Initiating Rollback`); + if (attempts !== 0) { + this.log.warn(`${fnTag} All retries exhausted! Initiating Rollback`); const rollBackStatus = await this.initiateRollback(session, true); if (rollBackStatus) { this.log.info( - `${fnTag} rollback was success: ${session.getSessionId()}`, + `${fnTag} Rollback was successful for sessionID: ${session.getSessionId()}`, ); } else { this.log.error( - `${fnTag} rollback failed ! ${session.getSessionId()}`, + `${fnTag} Rollback failed for sessionID: ${session.getSessionId()}`, ); } } @@ -230,6 +203,52 @@ export class CrashRecoveryManager { } } + private async checkCrash(session: SATPSession): Promise { + const fnTag = `${this.className}#checkCrash()`; + const sessionData = session.hasClientSessionData() + ? session.getClientSessionData() + : session.getServerSessionData(); + + try { + session.verify( + fnTag, + session.hasClientSessionData() + ? SessionType.CLIENT + : SessionType.SERVER, + ); + + const lastLog = await this.logRepository.readLastestLog( + session.getSessionId(), + ); + + if (lastLog && lastLog.operation !== "done") { + this.log.debug( + `${fnTag} Crash detected for session ID: ${session.getSessionId()}, last log operation: ${lastLog.operation}`, + ); + return CrashStatus.IN_RECOVERY; + } + + const logTimestamp = new Date(lastLog?.timestamp ?? 0).getTime(); + const currentTime = new Date().getTime(); + const timeDifference = currentTime - logTimestamp; + + if (timeDifference > Number(sessionData.maxTimeout)) { + this.log.warn( + `${fnTag} Timeout exceeded by ${timeDifference} ms for session ID: ${session.getSessionId()}`, + ); + return CrashStatus.IN_RECOVERY; + } + + this.log.info( + `${fnTag} No crash detected for session ID: ${session.getSessionId()}`, + ); + return CrashStatus.NO_CRASH; + } catch (error) { + this.log.error(`${fnTag} Error occurred during crash check: ${error}`); + return CrashStatus.ERROR; + } + } + public async handleRecovery(session: SATPSession): Promise { const fnTag = `${this.className}#handleRecovery()`; @@ -316,12 +335,23 @@ export class CrashRecoveryManager { return false; } - let reconstructedSessionData = new SessionData(); + let reconstructedSessionData: SessionData | undefined; for (const logEntry of allLogs) { const data = JSON.parse(logEntry.data); reconstructedSessionData = data; - this.sessions.set(sessionId, reconstructedSessionData); + + if (reconstructedSessionData) { + // Reconstruct SATPSession from SessionData + const satpSession = SATPSession.fromSessionData( + reconstructedSessionData, + ); + this.sessions.set(sessionId, satpSession); + } else { + this.log.error( + `${fnTag} Reconstructed session data is undefined for session ID: ${sessionId}`, + ); + } } this.log.info( @@ -396,6 +426,7 @@ export class CrashRecoveryManager { return await strategy.execute(session); } catch (error) { this.log.error(`${fnTag} Error executing rollback strategy: ${error}`); + return undefined; } } diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-recovery-handler.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-recovery-handler.ts index 3872e66e48e..3c67fb087bd 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-recovery-handler.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-recovery-handler.ts @@ -14,20 +14,21 @@ import { ILoggerOptions, } from "@hyperledger/cactus-common"; import { Empty } from "@bufbuild/protobuf"; -import { SessionData } from "../../generated/proto/cacti/satp/v02/common/session_pb"; +//import { SessionData } from "../../generated/proto/cacti/satp/v02/common/session_pb"; import { ILocalLogRepository } from "../../repository/interfaces/repository"; import { getSatpLogKey } from "../../gateway-utils"; +import { SATPSession } from "../satp-session"; interface HandlerOptions { crashService: CrashRecoveryService; loggerOptions: ILoggerOptions; - sessions: Map; + sessions: Map; logRepository: ILocalLogRepository; } export class CrashRecoveryHandler { public static readonly CLASS_NAME = "CrashRecoveryHandler"; - public sessions: Map; + public sessions: Map; private service: CrashRecoveryService; private log: Logger; private logRepository: ILocalLogRepository; diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/satp-session.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/satp-session.ts index 47b5fd5ae94..0b189008f7b 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/satp-session.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/satp-session.ts @@ -128,6 +128,29 @@ export class SATPSession { return this.clientSessionData; } + public static fromSessionData(sessionData: SessionData): SATPSession { + // Determine if it's a client or server session based on the presence of gateway pubkeys + const isServer = sessionData.serverGatewayPubkey !== ""; + const isClient = sessionData.clientGatewayPubkey !== ""; + + const session = new SATPSession({ + contextID: sessionData.transferContextId, + sessionID: sessionData.id, + server: isServer, + client: isClient, + }); + + // Assign the sessionData to the appropriate property + if (isServer) { + session.serverSessionData = sessionData; + } + if (isClient) { + session.clientSessionData = sessionData; + } + + return session; + } + public createSessionData( type: SessionType, sessionId: string, diff --git a/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/cron.test.ts b/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/cron.test.ts new file mode 100644 index 00000000000..55168e48dd2 --- /dev/null +++ b/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/cron.test.ts @@ -0,0 +1,156 @@ +import "jest-extended"; +import { CrashRecoveryManager } from "../../../../main/typescript/core/recovery/crash-manager"; +import { LogLevelDesc, Secp256k1Keys } from "@hyperledger/cactus-common"; +import { ICrashRecoveryManagerOptions } from "../../../../main/typescript/core/recovery/crash-manager"; +import knex from "knex"; +import { + LocalLog, + SupportedChain, +} from "../../../../main/typescript/core/types"; +import { + Asset, + CredentialProfile, + LockType, + SignatureAlgorithm, +} from "../../../../main/typescript/generated/proto/cacti/satp/v02/common/message_pb"; +import { v4 as uuidv4 } from "uuid"; +import { SATP_VERSION } from "../../../../main/typescript/core/constants"; +import { SATPSession } from "../../../../main/typescript/core/satp-session"; +import { knexClientConnection } from "../../knex.config"; +import { getSatpLogKey } from "../../../../main/typescript/gateway-utils"; +import { TokenType } from "../../../../main/typescript/core/stage-services/satp-bridge/types/asset"; + +const logLevel: LogLevelDesc = "DEBUG"; + +let mockSession: SATPSession; +const keyPairs = Secp256k1Keys.generateKeyPairsBuffer(); +const sessionId = uuidv4(); + +const createMockSession = (maxTimeout: string, maxRetries: string) => { + const mockSession = new SATPSession({ + contextID: "MOCK_CONTEXT_ID", + server: false, + client: true, + }); + + const sessionData = mockSession.hasClientSessionData() + ? mockSession.getClientSessionData() + : mockSession.getServerSessionData(); + + sessionData.id = sessionId; + sessionData.maxTimeout = maxTimeout; + sessionData.maxRetries = maxRetries; + sessionData.version = SATP_VERSION; + sessionData.clientGatewayPubkey = Buffer.from(keyPairs.publicKey).toString( + "hex", + ); + sessionData.serverGatewayPubkey = sessionData.clientGatewayPubkey; + sessionData.originatorPubkey = "MOCK_ORIGINATOR_PUBKEY"; + sessionData.beneficiaryPubkey = "MOCK_BENEFICIARY_PUBKEY"; + sessionData.digitalAssetId = "MOCK_DIGITAL_ASSET_ID"; + sessionData.assetProfileId = "MOCK_ASSET_PROFILE_ID"; + sessionData.receiverGatewayOwnerId = "MOCK_RECEIVER_GATEWAY_OWNER_ID"; + sessionData.recipientGatewayNetworkId = SupportedChain.FABRIC; + sessionData.senderGatewayOwnerId = "MOCK_SENDER_GATEWAY_OWNER_ID"; + sessionData.senderGatewayNetworkId = SupportedChain.BESU; + sessionData.signatureAlgorithm = SignatureAlgorithm.RSA; + sessionData.lockType = LockType.FAUCET; + sessionData.lockExpirationTime = BigInt(1000); + sessionData.credentialProfile = CredentialProfile.X509; + sessionData.loggingProfile = "MOCK_LOGGING_PROFILE"; + sessionData.accessControlProfile = "MOCK_ACCESS_CONTROL_PROFILE"; + sessionData.resourceUrl = "MOCK_RESOURCE_URL"; + sessionData.lockAssertionExpiration = BigInt(99999); + sessionData.receiverContractOntology = "MOCK_RECEIVER_CONTRACT_ONTOLOGY"; + sessionData.senderContractOntology = "MOCK_SENDER_CONTRACT_ONTOLOGY"; + sessionData.sourceLedgerAssetId = "MOCK_SOURCE_LEDGER_ASSET_ID"; + sessionData.senderAsset = new Asset(); + sessionData.senderAsset.tokenId = "MOCK_TOKEN_ID"; + sessionData.senderAsset.tokenType = TokenType.ERC20; + sessionData.senderAsset.amount = BigInt(0); + sessionData.senderAsset.owner = "MOCK_SENDER_ASSET_OWNER"; + sessionData.senderAsset.ontology = "MOCK_SENDER_ASSET_ONTOLOGY"; + sessionData.senderAsset.contractName = "MOCK_SENDER_ASSET_CONTRACT_NAME"; + sessionData.senderAsset.contractAddress = + "MOCK_SENDER_ASSET_CONTRACT_ADDRESS"; + sessionData.receiverAsset = new Asset(); + + sessionData.receiverAsset.tokenType = TokenType.ERC20; + sessionData.receiverAsset.amount = BigInt(0); + sessionData.receiverAsset.owner = "MOCK_RECEIVER_ASSET_OWNER"; + sessionData.receiverAsset.ontology = "MOCK_RECEIVER_ASSET_ONTOLOGY"; + sessionData.receiverAsset.contractName = "MOCK_RECEIVER_ASSET_CONTRACT_NAME"; + sessionData.receiverAsset.mspId = "MOCK_RECEIVER_ASSET_MSP_ID"; + sessionData.receiverAsset.channelName = "MOCK_CHANNEL_ID"; + sessionData.lastSequenceNumber = BigInt(4); + + return mockSession; +}; +let crashManager: CrashRecoveryManager; + +beforeAll(async () => { + const knexInstance = knex(knexClientConnection); + await knexInstance.migrate.latest(); + + const crashManagerOptions: ICrashRecoveryManagerOptions = { + instanceId: uuidv4(), + logLevel: logLevel, + knexConfig: knexClientConnection, + bridgeConfig: { + logLevel: logLevel, + networks: [], + supportedDLTs: [SupportedChain.BESU, SupportedChain.FABRIC], + }, + }; + + crashManager = new CrashRecoveryManager(crashManagerOptions); +}); + +afterEach(() => { + jest.clearAllMocks(); + jest.useRealTimers(); +}); + +describe("CrashRecoveryManager Tests", () => { + it("should trigger checkAndResolveCrashes via cron schedule every 10 seconds for 30 seconds", async () => { + jest.useFakeTimers(); + + mockSession = createMockSession("10000", "3"); + const sessionData = mockSession.hasClientSessionData() + ? mockSession.getClientSessionData() + : mockSession.getServerSessionData(); + + const sessionId = sessionData.id; + const key = getSatpLogKey(sessionId, "type", "operation"); + const mockLogEntry: LocalLog = { + sessionID: sessionId, + type: "type", + key: key, + operation: "operation", + timestamp: new Date().toISOString(), + data: JSON.stringify(sessionData), + }; + const mockLogRepository = crashManager["logRepository"]; + + await mockLogRepository.create(mockLogEntry); + + const mockCheckAndResolveCrash = jest + .spyOn(CrashRecoveryManager.prototype, "checkAndResolveCrash") + .mockImplementation(() => Promise.resolve()); + + await crashManager.recoverSessions(); + + for (let i = 1; i <= 3; i++) { + jest.advanceTimersByTime(10000); + await Promise.resolve(); + } + + expect(mockCheckAndResolveCrash).toHaveBeenCalledTimes(3); + expect(mockCheckAndResolveCrash).toHaveBeenCalledWith( + expect.any(SATPSession), + ); + + mockCheckAndResolveCrash.mockRestore(); + jest.useRealTimers(); + }); +}); diff --git a/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/logging.test.ts b/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/logging.test.ts index 8aa6ba39028..7fc34246c37 100644 --- a/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/logging.test.ts +++ b/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/logging.test.ts @@ -146,7 +146,11 @@ describe("CrashRecoveryManager Tests", () => { if (recoveredSession) { const parsedSessionData: SessionData = JSON.parse(mockLogEntry.data); - expect(recoveredSession).toEqual(parsedSessionData); + const sessionData = recoveredSession.hasClientSessionData() + ? recoveredSession.getClientSessionData() + : recoveredSession.getServerSessionData(); + + expect(sessionData).toEqual(parsedSessionData); } }); diff --git a/yarn.lock b/yarn.lock index 19f3e80e6ad..047a1f590a5 100644 --- a/yarn.lock +++ b/yarn.lock @@ -9112,6 +9112,7 @@ __metadata: "@types/fs-extra": "npm:11.0.4" "@types/google-protobuf": "npm:3.15.5" "@types/node": "npm:18.18.2" + "@types/node-cron": "npm:3.0.11" "@types/pg": "npm:8.6.5" "@types/swagger-ui-express": "npm:4.1.6" "@types/tape": "npm:4.13.4" @@ -9140,10 +9141,12 @@ __metadata: knex: "npm:2.4.0" kubo-rpc-client: "npm:3.0.1" make-dir-cli: "npm:3.1.0" + node-cron: "npm:3.0.2" npm-run-all: "npm:4.1.5" openzeppelin-solidity: "npm:3.4.2" pg: "npm:^8.13.0" protobufjs: "npm:7.2.5" + safe-stable-stringify: "npm:2.5.0" secp256k1: "npm:4.0.3" socket.io: "npm:4.6.2" sqlite3: "npm:5.1.5" @@ -15892,6 +15895,13 @@ __metadata: languageName: node linkType: hard +"@types/node-cron@npm:3.0.11": + version: 3.0.11 + resolution: "@types/node-cron@npm:3.0.11" + checksum: 10/a73f69bcca52a5f3b1671cfb00a8e4a1d150d0aef36a611564a2f94e66b6981bade577e267ceeeca6fcee241768902d55eb8cf3a81f9ef4ed767a23112fdb16d + languageName: node + linkType: hard + "@types/node-fetch@npm:2.6.2": version: 2.6.2 resolution: "@types/node-fetch@npm:2.6.2" @@ -38210,6 +38220,15 @@ __metadata: languageName: node linkType: hard +"node-cron@npm:3.0.2": + version: 3.0.2 + resolution: "node-cron@npm:3.0.2" + dependencies: + uuid: "npm:8.3.2" + checksum: 10/71d4ce22425d0f2a7bd9753149da475317725f6890d5a55e5e43a97062456faeda984d55c17aee9699552460525cee948662fb75124065810e4038b2f56a9d32 + languageName: node + linkType: hard + "node-domexception@npm:^1.0.0": version: 1.0.0 resolution: "node-domexception@npm:1.0.0" @@ -44853,6 +44872,13 @@ __metadata: languageName: node linkType: hard +"safe-stable-stringify@npm:2.5.0": + version: 2.5.0 + resolution: "safe-stable-stringify@npm:2.5.0" + checksum: 10/2697fa186c17c38c3ca5309637b4ac6de2f1c3d282da27cd5e1e3c88eca0fb1f9aea568a6aabdf284111592c8782b94ee07176f17126031be72ab1313ed46c5c + languageName: node + linkType: hard + "safe-stable-stringify@npm:^2.3.1": version: 2.3.1 resolution: "safe-stable-stringify@npm:2.3.1"