Skip to content

Commit

Permalink
feat(recovery): add rollback implementations
Browse files Browse the repository at this point in the history
Signed-off-by: Yogesh01000100 <[email protected]>
  • Loading branch information
Yogesh01000100 committed Aug 29, 2024
1 parent 008dd96 commit ce9a179
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,41 +149,51 @@ export class CrashRecoveryManager {
this.log.info(`${fnTag} Checking crash status for session ${sessionId}`);

try {
const crashStatus = await this.checkCrash(sessionId);
if (crashStatus === CrashStatus.IN_RECOVERY) {
// create new occurrence
const crashOccurrence = new CrashOccurrence(
CrashStatus.IN_RECOVERY,
new Date(),
new Date(),
);
this.log.debug(crashOccurrence);
// todo manage occurrence
// call corresponding services via handler for crash recovery
const sessionData = sessionId.hasClientSessionData()
? sessionId.getClientSessionData()
: sessionId.getServerSessionData();

const lastSequenceNumber = BigInt(sessionData.lastSequenceNumber);
const maxRetries = BigInt(sessionData.maxRetries);

if (lastSequenceNumber > maxRetries) {
this.log.warn(
`${fnTag} Max retries exceeded for session ${sessionId}. Initiating rollback...`,
);
await this.initiateRollback(sessionId, true);
} else {
const sessionData = sessionId.hasClientSessionData()
? sessionId.getClientSessionData()
: sessionId.getServerSessionData();

if (!sessionData) {
throw new Error(`${fnTag}, session data is not correctly initialized`);
}

let attempts = 0;
let crashOccurrence: CrashOccurrence | undefined;

while (attempts < BigInt(sessionData.maxRetries)) {
const crashStatus = await this.checkCrash(sessionId);

if (crashStatus === CrashStatus.IN_RECOVERY) {
this.log.info(
`${fnTag} Attempting recovery for session ${sessionId}`,
`${fnTag} Crash detected. Attempting recovery for session ${sessionId}`,
);

if (!crashOccurrence) {
crashOccurrence = new CrashOccurrence(
CrashStatus.IN_RECOVERY,
new Date(),
new Date(),
);
} else {
crashOccurrence.lastUpdate = new Date();
}

await this.handleRecovery(sessionId);
this.log.info(`${fnTag} Recovery successful.`);

crashOccurrence.status = CrashStatus.RECOVERED;

return true;
}
crashOccurrence.status = CrashStatus.RECOVERED;
return true;
} else {
this.log.info(`${fnTag} No crash detected. No action required.`);
return false;
attempts++;
this.log.info(
`${fnTag} Retry attempt ${attempts} for session ${sessionId}`,
);
}

this.log.warn(`${fnTag} All retries exhausted. Initiating rollback.`);
await this.initiateRollback(sessionId, true);
return false;
} catch (error) {
this.log.error(`${fnTag} Error during crash resolution: ${error}`);
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,26 @@ export class RollbackStrategyFactory {
const sessionData = session.hasClientSessionData()
? session.getClientSessionData()!
: session.getServerSessionData()!;
const rollbackLogEntry = new RollbackLogEntry();

if (!sessionData.hashes) {
this.log.debug(`${fnTag} Creating Stage0RollbackStrategy`);
return new Stage0RollbackStrategy();
return new Stage0RollbackStrategy(rollbackLogEntry);
} else if (
!sessionData.hashes.stage2 ||
Object.keys(sessionData.hashes.stage2).length === 0
) {
this.log.debug(`${fnTag} Creating Stage1RollbackStrategy`);
return new Stage1RollbackStrategy(this.bridgeManager);
return new Stage1RollbackStrategy(this.bridgeManager, rollbackLogEntry);
} else if (
!sessionData.hashes.stage3 ||
Object.keys(sessionData.hashes.stage3).length === 0
) {
this.log.debug(`${fnTag} Creating Stage2RollbackStrategy`);
return new Stage2RollbackStrategy();
return new Stage2RollbackStrategy(this.bridgeManager, rollbackLogEntry);
} else {
this.log.debug(`${fnTag} Creating Stage3RollbackStrategy`);
return new Stage3RollbackStrategy();
return new Stage3RollbackStrategy(this.bridgeManager, rollbackLogEntry);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import { RollbackLogEntry } from "../../../generated/proto/cacti/satp/v02/crash_

export class Stage0RollbackStrategy implements RollbackStrategy {
private log: Logger;
private rollbackLogEntry: RollbackLogEntry;

constructor() {
constructor(logEntry: RollbackLogEntry) {
this.log = LoggerProvider.getOrCreate({ label: "Stage0RollbackStrategy" });
this.rollbackLogEntry = logEntry;
}

// return a rollback state in all strategies
Expand All @@ -17,57 +19,52 @@ export class Stage0RollbackStrategy implements RollbackStrategy {

// check session exists
if (!session) {
this.log.error(`${fnTag} Session not found`);
return false;
throw new Error(`${fnTag}, session data is not correctly initialized`);
}
try {
// TODO record the rollback on the log. Implement RollbackLogEntry
this.log.debug("Persisting rollback log entry");

const rollbackLogEntry: RollbackLogEntry = new RollbackLogEntry();
rollbackLogEntry.sessionId = session.getSessionId();
rollbackLogEntry.stage = "Stage0";
rollbackLogEntry.timestamp = Date.now().toString();
rollbackLogEntry.action = "Rollback Initiated";
rollbackLogEntry.status = "SUCCESS";
rollbackLogEntry.details = "Stage 0 rollback executed without issues.";

await this.localLogs.saveRollbackLog(rollbackLogEntry); // log for the rollbackentry
this.rollbackLogEntry.sessionId = session.getSessionId();
this.rollbackLogEntry.stage = "Stage0";
this.rollbackLogEntry.timestamp = Date.now().toString();
this.rollbackLogEntry.action = "";
this.rollbackLogEntry.status = "SUCCESS";
this.rollbackLogEntry.details = "";

this.log.info(`Successfully rolled back Stage 0`);

const state: RollbackState = {
currentStage: "Stage0",
rollbackLogEntry: rollbackLogEntry,
rollbackLogEntry: this.rollbackLogEntry,
};
await this.rollbackLogs.create(state); // todo: log for the rollbackentry

return state;
} catch (error) {
this.log.error(`Failed to rollback Stage 0: ${error}`);
const rollbackLogEntry: RollbackLogEntry = new RollbackLogEntry();
rollbackLogEntry.sessionId = session.getSessionId(); // session Id from satp session data
rollbackLogEntry.stage = "Stage0";
rollbackLogEntry.timestamp = Date.now().toString();
rollbackLogEntry.action = "Rollback Failed";
rollbackLogEntry.status = "FAILURE";
rollbackLogEntry.details = "Stage 0 rollback executed without issues.";

await this.localLogs.saveRollbackLog(rollbackLogEntry); // todo: implement the correct log support
this.rollbackLogEntry.sessionId = session.getSessionId();
this.rollbackLogEntry.stage = "Stage0";
this.rollbackLogEntry.timestamp = Date.now().toString();
this.rollbackLogEntry.action = "";
this.rollbackLogEntry.status = "FAILURE";
this.rollbackLogEntry.details = "";

const state: RollbackState = {
currentStage: "Stage0",
rollbackLogEntry: rollbackLogEntry,
rollbackLogEntry: this.rollbackLogEntry,
};
await this.rollbackLogs.create(state); // todo: implement the correct log support
return state;
}
}

async cleanup(session: SATPSession): Promise<RollbackState> {
const fnTag = "Stage0RollbackStrategy#cleanup";
// for stage 0, do nothing
const rollbackLogEntry: RollbackLogEntry = new RollbackLogEntry();
const state: RollbackState = {
currentStage: "Stage0",
rollbackLogEntry: rollbackLogEntry,
};
if (!session) {
this.log.error(`${fnTag} Session not found`);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,48 +7,48 @@ import { SATPBridgeManager } from "../../stage-services/satp-bridge/satp-bridge-
export class Stage1RollbackStrategy implements RollbackStrategy {
private log: Logger;
private bridgeManager: SATPBridgeManager;
private rollbackLogEntry: RollbackLogEntry;

constructor(bridgeManager: SATPBridgeManager) {
constructor(bridgeManager: SATPBridgeManager, logEntry: RollbackLogEntry) {
this.log = LoggerProvider.getOrCreate({ label: "Stage1RollbackStrategy" });
this.bridgeManager = bridgeManager;
this.rollbackLogEntry = logEntry;
}

async execute(session: SATPSession): Promise<RollbackState> {
const fnTag = "Stage1RollbackStrategy#execute";
this.log.info(`${fnTag} Executing rollback for Stage 1`);

if (!session) {
this.log.error(`${fnTag} Session not found`);
return false;
throw new Error(`${fnTag}, session data is not correctly initialized`);
}

try {
// TODO: Implement Stage 1 specific rollback logic

// TODO: Record the rollback on the log. Implement RollbackLogEntry

const receipt = await this.bridgeManager.unwrapAsset("assetId"); // unwrap
const receipt = await this.bridgeManager.unwrapAsset("assetId");

this.log.info(`${fnTag}, Asset unlocked: ${receipt}`);

const rollbackLogEntry: RollbackLogEntry = {
sessionId: session.getSessionId(),
stage: "Stage1",
timestamp: new Date().toString(),
action: "Unwrap Asset",
status: "SUCCESS",
details: `Asset unwrap receipt: ${receipt}`,
};
this.rollbackLogEntry.sessionId = session.getSessionId();
this.rollbackLogEntry.stage = "Stage1";
this.rollbackLogEntry.timestamp = Date.now().toString();
this.rollbackLogEntry.action = "UNWRAP";
this.rollbackLogEntry.status = "SUCCESS";
this.rollbackLogEntry.details = "";

this.log.debug("Persisting rollback log entry");

this.log.info(`Successfully rolled back Stage 1`);

const state: RollbackState = {
currentStage: "Stage1",
rollbackLogEntry: rollbackLogEntry,
rollbackLogEntry: this.rollbackLogEntry,
};

await this.rollbackLogs.create(state); // todo: log support
return state;
} catch (error) {
this.log.error(`Failed to rollback Stage 1: ${error}`);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,26 @@
import { Logger, LoggerProvider } from "@hyperledger/cactus-common";
import { SATPSession } from "./satp-session";
import { SATPSession } from "../../satp-session";
import { RollbackState, RollbackStrategy } from "./rollback-strategy-factory";
import { RollbackLogEntry } from "../../../generated/proto/cacti/satp/v02/crash_recovery_pb";
import { SATPBridgeManager } from "../../stage-services/satp-bridge/satp-bridge-manager";

export class Stage2RollbackStrategy implements RollbackStrategy {
private log: Logger;
private bridgeManager: SATPBridgeManager;
private rollbackLogEntry: RollbackLogEntry;

constructor() {
constructor(bridgeManager: SATPBridgeManager, logEntry: RollbackLogEntry) {
this.log = LoggerProvider.getOrCreate({ label: "Stage2RollbackStrategy" });
this.bridgeManager = bridgeManager;
this.rollbackLogEntry = logEntry;
}

async execute(session: SATPSession): Promise<RollbackState> {
const fnTag = "Stage2RollbackStrategy#execute";
this.log.info(`${fnTag} Executing rollback for Stage 2`);

if (!session) {
this.log.error(`${fnTag} Session not found`);
return false;
throw new Error(`${fnTag}, session data is not correctly initialized`);
}

try {
Expand All @@ -24,8 +29,25 @@ export class Stage2RollbackStrategy implements RollbackStrategy {
// TODO: Record the rollback on the log. Implement RollbackLogEntry
this.log.debug("Persisting rollback log entry");

const receipt = await this.bridgeManager.unlockAsset("assetId", Number());

this.log.info(`${fnTag}, Asset unlocked: ${receipt}`);
this.rollbackLogEntry.sessionId = session.getSessionId();
this.rollbackLogEntry.stage = "Stage2";
this.rollbackLogEntry.timestamp = Date.now().toString();
this.rollbackLogEntry.action = "UNLOCK";
this.rollbackLogEntry.status = "SUCCESS";
this.rollbackLogEntry.details = "";

this.log.info(`Successfully rolled back Stage 2`);
return true;

const state: RollbackState = {
currentStage: "Stage2",
rollbackLogEntry: this.rollbackLogEntry,
};

await this.rollbackLogs.create(state); // todo: log support
return state;
} catch (error) {
this.log.error(`Failed to rollback Stage 2: ${error}`);
return false;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,31 +1,54 @@
import { Logger, LoggerProvider } from "@hyperledger/cactus-common";
import { SATPSession } from "./satp-session";
import { SATPSession } from "../../satp-session";
import { RollbackState, RollbackStrategy } from "./rollback-strategy-factory";
import { RollbackLogEntry } from "../../../generated/proto/cacti/satp/v02/crash_recovery_pb";
import { SATPBridgeManager } from "../../stage-services/satp-bridge/satp-bridge-manager";

export class Stage3RollbackStrategy implements RollbackStrategy {
private log: Logger;
private bridgeManager: SATPBridgeManager;
private rollbackLogEntry: RollbackLogEntry;

constructor() {
constructor(bridgeManager: SATPBridgeManager, logEntry: RollbackLogEntry) {
this.log = LoggerProvider.getOrCreate({ label: "Stage3RollbackStrategy" });
this.bridgeManager = bridgeManager;
this.rollbackLogEntry = logEntry;
}

async execute(session: SATPSession): Promise<RollbackState> {
const fnTag = "Stage3RollbackStrategy#execute";
this.log.info(`${fnTag} Executing rollback for Stage 3`);

if (!session) {
this.log.error(`${fnTag} Session not found`);
return false;
throw new Error(`${fnTag}, session data is not correctly initialized`);
}

try {
// TODO: Implement Stage 3 specific rollback logic

// TODO: Record the rollback on the log. Implement RollbackLogEntry

const receipt = await this.bridgeManager.burnAsset("assetId", Number());

this.log.info(`${fnTag}, Asset unlocked: ${receipt}`);

this.rollbackLogEntry.sessionId = session.getSessionId();
this.rollbackLogEntry.stage = "Stage3";
this.rollbackLogEntry.timestamp = Date.now().toString();
this.rollbackLogEntry.action = "BURN";
this.rollbackLogEntry.status = "SUCCESS";
this.rollbackLogEntry.details = "";

this.log.debug("Persisting rollback log entry");

this.log.info(`Successfully rolled back Stage 3`);
return true;
const state: RollbackState = {
currentStage: "Stage3",
rollbackLogEntry: this.rollbackLogEntry,
};

await this.rollbackLogs.create(state); // todo: log support
return state;
} catch (error) {
this.log.error(`Failed to rollback Stage 3: ${error}`);
return false;
Expand Down

0 comments on commit ce9a179

Please sign in to comment.