Skip to content

Commit

Permalink
feat: updated org model and sync registries to prevent locking organi…
Browse files Browse the repository at this point in the history
…zation table during audit sync
  • Loading branch information
wwills2 committed Nov 22, 2024
1 parent 23a245e commit a3f33f7
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 30 deletions.
4 changes: 2 additions & 2 deletions src/models/audit/audit.model.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { AuditMirror } from './audit.model.mirror';
import ModelTypes from './audit.modeltypes.cjs';
import findDuplicateIssuancesSql from './sql/find-duplicate-issuances.sql.js';
import { Organization } from '../organizations/index.js';
import { waitForSyncRegistries } from '../../utils/model-utils.js';
import { waitForSyncRegistriesTransaction } from '../../utils/model-utils.js';

class Audit extends Model {
static async create(values, options) {
Expand Down Expand Up @@ -109,7 +109,7 @@ Audit.init(ModelTypes, {
});

Audit.addHook('beforeFind', async () => {
await waitForSyncRegistries();
await waitForSyncRegistriesTransaction();
});

export { Audit };
5 changes: 0 additions & 5 deletions src/models/organizations/organizations.model.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import { getConfig } from '../../utils/config-loader';
const { USE_SIMULATOR, AUTO_SUBSCRIBE_FILESTORE } = getConfig().APP;

import ModelTypes from './organizations.modeltypes.cjs';
import { waitForSyncRegistries } from '../../utils/model-utils.js';

class Organization extends Model {
static async getHomeOrg(includeAddress = true) {
Expand Down Expand Up @@ -470,8 +469,4 @@ Organization.init(ModelTypes, {
timestamps: true,
});

Organization.addHook('beforeFind', async () => {
await waitForSyncRegistries();
});

export { Organization };
98 changes: 79 additions & 19 deletions src/tasks/sync-registries.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import {
generateGenerationIndex,
} from '../utils/sync-migration-utils';
import {
processingUpdateTransactionMutex,
processingSyncRegistriesTransactionMutex,
syncRegistriesTaskMutex,
} from '../utils/model-utils.js';

Expand Down Expand Up @@ -89,19 +89,65 @@ const processJob = async () => {
});

for (const organization of organizations) {
await syncOrganizationAudit(organization);
if (CONFIG.USE_SIMULATOR || process.env.NODE_ENV === 'test') {
await syncOrganizationAudit(organization);
} else {
const mostRecentOrgAuditRecord = await Audit.findOne({
where: {
orgUid: organization.orgUid,
},
order: [['createdAt', 'DESC']],
limit: 1,
raw: true,
});

// verify that the latest organization root hash is up to date with the audit records. attempt correction.
if (mostRecentOrgAuditRecord.rootHash !== organization.registryHash) {
logger.warn(
`latest root hash in org table for organization ${organization.name} (orgUid ${organization.orgUid}) does not match the audit records. attempting to correct`,
);
try {
const result = await Organization.update(
{ registryHash: mostRecentOrgAuditRecord.rootHash },
{
where: { orgUid: organization.orgUid },
},
);

if (result?.length) {
logger.info(
`registry hash record corrected for ${organization.name} (orgUid ${organization.orgUid}). proceeding with audit sync`,
);
const correctedOrganizationRecord = await Organization.findOne({
where: { orgUid: organization.orgUid },
});

await syncOrganizationAudit(correctedOrganizationRecord);
} else {
throw new Error('organizations update query affected 0 records');
}
} catch (error) {
logger.error(
`failed to update organization table record for ${organization.name} (orgUid ${organization.orgUid}) with correct root hash. Something is wrong. Skipping audit sync and trying again shortly. Error: ${error}`,
);
}
} else {
// normal state, proceed with audit sync
await syncOrganizationAudit(organization);
}
}
}
};

async function createTransaction(callback, afterCommitCallbacks) {
async function updateAuditTransaction(callback, afterCommitCallbacks) {
let result = null;

let transaction;
let mirrorTransaction;

logger.info('Starting sequelize transaction and acquiring transaction mutex');
const releaseTransactionMutex =
await processingUpdateTransactionMutex.acquire();
await processingSyncRegistriesTransactionMutex.acquire();

try {
// Start a transaction
Expand Down Expand Up @@ -131,8 +177,9 @@ async function createTransaction(callback, afterCommitCallbacks) {
} catch (error) {
// Roll back the transaction if an error occurs
if (transaction) {
logger.error('Rolling back transaction');
console.error(error);
logger.error(
`encountered error syncing organization audit. Rolling back transaction. Error: ${error}`,
);
await transaction.rollback();
}
} finally {
Expand Down Expand Up @@ -439,7 +486,10 @@ const syncOrganizationAudit = async (organization) => {
// by not processing the DELETE for that record.
const optimizedKvDiff = optimizeAndSortKvDiff(kvDiff);

const updateTransaction = async (transaction, mirrorTransaction) => {
const createAndProcessTransaction = async (
transaction,
mirrorTransaction,
) => {
logger.info(
`Syncing ${organization.name} generation ${toBeProcessedDatalayerGenerationIndex} (orgUid ${organization.orgUid}, registryId ${organization.registryId})`,
);
Expand Down Expand Up @@ -536,18 +586,8 @@ const syncOrganizationAudit = async (organization) => {
}

// Create the Audit record
logger.debug(
`creating audit model entry for ${organization.name} transacton`,
);
logger.debug(`creating audit model transaction entry`);
await Audit.create(auditData, { transaction, mirrorTransaction });
await Organization.update(
{ registryHash: rootToBeProcessed.root_hash },
{
where: { orgUid: organization.orgUid },
transaction,
mirrorTransaction,
},
);
}
}
};
Expand All @@ -556,7 +596,27 @@ const syncOrganizationAudit = async (organization) => {
afterCommitCallbacks.push(truncateStaging);
}

await createTransaction(updateTransaction, afterCommitCallbacks);
const transactionSucceeded = await createAndProcessTransaction(
updateAuditTransaction,
afterCommitCallbacks,
);

if (transactionSucceeded) {
logger.debug(
`updateAuditTransaction successfully completed and committed audit updates for ${organization.name} (orgUid: ${organization.orgUid}, registryId: ${organization.registryId}) generation index ${toBeProcessedDatalayerGenerationIndex}. updating registry hash to ${rootToBeProcessed.root_hash}`,
);

await Organization.update(
{ registryHash: rootToBeProcessed.root_hash },
{
where: { orgUid: organization.orgUid },
},
);
} else {
logger.debug(
`updateAuditTransaction failed to complete and commit audit updates for ${organization.name} (orgUid: ${organization.orgUid}, registryId: ${organization.registryId}) generation index ${toBeProcessedDatalayerGenerationIndex}`,
);
}
} catch (error) {
logger.error('Error syncing org audit', error);
}
Expand Down
9 changes: 5 additions & 4 deletions src/utils/model-utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ import Sequelize from 'sequelize';

import { Mutex } from 'async-mutex';

export async function waitForSyncRegistries() {
if (processingUpdateTransactionMutex.isLocked()) {
export async function waitForSyncRegistriesTransaction() {
if (processingSyncRegistriesTransactionMutex.isLocked()) {
// when the mutex is acquired, the current sync transaction has completed
const releaseMutex = await processingUpdateTransactionMutex.acquire();
const releaseMutex =
await processingSyncRegistriesTransactionMutex.acquire();
await releaseMutex();
}
}
Expand All @@ -24,7 +25,7 @@ export const syncRegistriesTaskMutex = new Mutex();
* audit model update transactions are large and lock the DB for long periods.
* @type {Mutex}
*/
export const processingUpdateTransactionMutex = new Mutex();
export const processingSyncRegistriesTransactionMutex = new Mutex();

export function formatModelAssociationName(model) {
if (model == null || model.model == null) return '';
Expand Down

0 comments on commit a3f33f7

Please sign in to comment.