Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: updated org model and sync registries to prevent locking organi… #1230

Merged
merged 2 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/models/audit/audit.model.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { AuditMirror } from './audit.model.mirror';
import ModelTypes from './audit.modeltypes.cjs';
import findDuplicateIssuancesSql from './sql/find-duplicate-issuances.sql.js';
import { Organization } from '../organizations/index.js';
import { waitForSyncRegistries } from '../../utils/model-utils.js';
import { waitForSyncRegistriesTransaction } from '../../utils/model-utils.js';

class Audit extends Model {
static async create(values, options) {
Expand Down Expand Up @@ -109,7 +109,7 @@ Audit.init(ModelTypes, {
});

Audit.addHook('beforeFind', async () => {
await waitForSyncRegistries();
await waitForSyncRegistriesTransaction();
});

export { Audit };
5 changes: 0 additions & 5 deletions src/models/organizations/organizations.model.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import { getConfig } from '../../utils/config-loader';
const { USE_SIMULATOR, AUTO_SUBSCRIBE_FILESTORE } = getConfig().APP;

import ModelTypes from './organizations.modeltypes.cjs';
import { waitForSyncRegistries } from '../../utils/model-utils.js';

class Organization extends Model {
static async getHomeOrg(includeAddress = true) {
Expand Down Expand Up @@ -470,8 +469,4 @@ Organization.init(ModelTypes, {
timestamps: true,
});

Organization.addHook('beforeFind', async () => {
await waitForSyncRegistries();
});

export { Organization };
102 changes: 79 additions & 23 deletions src/tasks/sync-registries.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import {
generateGenerationIndex,
} from '../utils/sync-migration-utils';
import {
processingUpdateTransactionMutex,
processingSyncRegistriesTransactionMutex,
syncRegistriesTaskMutex,
} from '../utils/model-utils.js';

Expand Down Expand Up @@ -89,19 +89,63 @@ const processJob = async () => {
});

for (const organization of organizations) {
await syncOrganizationAudit(organization);
if (CONFIG.USE_SIMULATOR || process.env.NODE_ENV === 'test') {
await syncOrganizationAudit(organization);
} else {
const mostRecentOrgAuditRecord = await Audit.findOne({
where: {
orgUid: organization.orgUid,
},
order: [['createdAt', 'DESC']],
limit: 1,
raw: true,
});

// verify that the latest organization root hash is up to date with the audit records. attempt correction.
if (mostRecentOrgAuditRecord.rootHash !== organization.registryHash) {
logger.warn(
`latest root hash in org table for organization ${organization.name} (orgUid ${organization.orgUid}) does not match the audit records. attempting to correct`,
);
try {
const result = await Organization.update(
{ registryHash: mostRecentOrgAuditRecord.rootHash },
{
where: { orgUid: organization.orgUid },
},
);

if (result?.length) {
logger.info(
`registry hash record corrected for ${organization.name} (orgUid ${organization.orgUid}). proceeding with audit sync`,
);
const correctedOrganizationRecord = await Organization.findOne({
where: { orgUid: organization.orgUid },
});

await syncOrganizationAudit(correctedOrganizationRecord);
} else {
throw new Error('organizations update query affected 0 records');
}
} catch (error) {
logger.error(
`failed to update organization table record for ${organization.name} (orgUid ${organization.orgUid}) with correct root hash. Something is wrong. Skipping audit sync and trying again shortly. Error: ${error}`,
);
}
} else {
// normal state, proceed with audit sync
await syncOrganizationAudit(organization);
}
}
}
};

async function createTransaction(callback, afterCommitCallbacks) {
let result = null;

async function createAndProcessTransaction(callback, afterCommitCallbacks) {
let transaction;
let mirrorTransaction;

logger.info('Starting sequelize transaction and acquiring transaction mutex');
const releaseTransactionMutex =
await processingUpdateTransactionMutex.acquire();
await processingSyncRegistriesTransactionMutex.acquire();

try {
// Start a transaction
Expand All @@ -112,7 +156,7 @@ async function createTransaction(callback, afterCommitCallbacks) {
}

// Execute the provided callback with the transaction
result = await callback(transaction, mirrorTransaction);
await callback(transaction, mirrorTransaction);

// Commit the transaction if the callback completes without errors
await transaction.commit();
Expand All @@ -127,13 +171,15 @@ async function createTransaction(callback, afterCommitCallbacks) {

logger.info('Commited sequelize transaction');

return result;
return true;
} catch (error) {
// Roll back the transaction if an error occurs
if (transaction) {
logger.error('Rolling back transaction');
console.error(error);
logger.error(
`encountered error syncing organization audit. Rolling back transaction. Error: ${error}`,
);
await transaction.rollback();
return false;
}
} finally {
releaseTransactionMutex();
Expand Down Expand Up @@ -439,7 +485,7 @@ const syncOrganizationAudit = async (organization) => {
// by not processing the DELETE for that record.
const optimizedKvDiff = optimizeAndSortKvDiff(kvDiff);

const updateTransaction = async (transaction, mirrorTransaction) => {
const updateAuditTransaction = async (transaction, mirrorTransaction) => {
logger.info(
`Syncing ${organization.name} generation ${toBeProcessedDatalayerGenerationIndex} (orgUid ${organization.orgUid}, registryId ${organization.registryId})`,
);
Expand Down Expand Up @@ -536,18 +582,8 @@ const syncOrganizationAudit = async (organization) => {
}

// Create the Audit record
logger.debug(
`creating audit model entry for ${organization.name} transacton`,
);
logger.debug(`creating audit model transaction entry`);
await Audit.create(auditData, { transaction, mirrorTransaction });
await Organization.update(
{ registryHash: rootToBeProcessed.root_hash },
{
where: { orgUid: organization.orgUid },
transaction,
mirrorTransaction,
},
);
}
}
};
Expand All @@ -556,7 +592,27 @@ const syncOrganizationAudit = async (organization) => {
afterCommitCallbacks.push(truncateStaging);
}

await createTransaction(updateTransaction, afterCommitCallbacks);
const transactionSucceeded = await createAndProcessTransaction(
updateAuditTransaction,
afterCommitCallbacks,
);

if (transactionSucceeded) {
logger.debug(
`updateAuditTransaction successfully completed and committed audit updates for ${organization.name} (orgUid: ${organization.orgUid}, registryId: ${organization.registryId}) generation index ${toBeProcessedDatalayerGenerationIndex}. updating registry hash to ${rootToBeProcessed.root_hash}`,
);

await Organization.update(
{ registryHash: rootToBeProcessed.root_hash },
{
where: { orgUid: organization.orgUid },
},
);
} else {
logger.debug(
`updateAuditTransaction failed to complete and commit audit updates for ${organization.name} (orgUid: ${organization.orgUid}, registryId: ${organization.registryId}) generation index ${toBeProcessedDatalayerGenerationIndex}`,
);
}
} catch (error) {
logger.error('Error syncing org audit', error);
}
Expand Down
9 changes: 5 additions & 4 deletions src/utils/model-utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ import Sequelize from 'sequelize';

import { Mutex } from 'async-mutex';

export async function waitForSyncRegistries() {
if (processingUpdateTransactionMutex.isLocked()) {
export async function waitForSyncRegistriesTransaction() {
if (processingSyncRegistriesTransactionMutex.isLocked()) {
// when the mutex is acquired, the current sync transaction has completed
const releaseMutex = await processingUpdateTransactionMutex.acquire();
const releaseMutex =
await processingSyncRegistriesTransactionMutex.acquire();
await releaseMutex();
}
}
Expand All @@ -24,7 +25,7 @@ export const syncRegistriesTaskMutex = new Mutex();
* audit model update transactions are large and lock the DB for long periods.
* @type {Mutex}
*/
export const processingUpdateTransactionMutex = new Mutex();
export const processingSyncRegistriesTransactionMutex = new Mutex();

export function formatModelAssociationName(model) {
if (model == null || model.model == null) return '';
Expand Down