diff --git a/src/models/audit/audit.model.js b/src/models/audit/audit.model.js index 31a6610c..5e758530 100644 --- a/src/models/audit/audit.model.js +++ b/src/models/audit/audit.model.js @@ -7,7 +7,7 @@ import { AuditMirror } from './audit.model.mirror'; import ModelTypes from './audit.modeltypes.cjs'; import findDuplicateIssuancesSql from './sql/find-duplicate-issuances.sql.js'; import { Organization } from '../organizations/index.js'; -import { waitForSyncRegistries } from '../../utils/model-utils.js'; +import { waitForSyncRegistriesTransaction } from '../../utils/model-utils.js'; class Audit extends Model { static async create(values, options) { @@ -109,7 +109,7 @@ Audit.init(ModelTypes, { }); Audit.addHook('beforeFind', async () => { - await waitForSyncRegistries(); + await waitForSyncRegistriesTransaction(); }); export { Audit }; diff --git a/src/models/organizations/organizations.model.js b/src/models/organizations/organizations.model.js index 5c4cc850..2e62da8f 100644 --- a/src/models/organizations/organizations.model.js +++ b/src/models/organizations/organizations.model.js @@ -19,7 +19,6 @@ import { getConfig } from '../../utils/config-loader'; const { USE_SIMULATOR, AUTO_SUBSCRIBE_FILESTORE } = getConfig().APP; import ModelTypes from './organizations.modeltypes.cjs'; -import { waitForSyncRegistries } from '../../utils/model-utils.js'; class Organization extends Model { static async getHomeOrg(includeAddress = true) { @@ -470,8 +469,4 @@ Organization.init(ModelTypes, { timestamps: true, }); -Organization.addHook('beforeFind', async () => { - await waitForSyncRegistries(); -}); - export { Organization }; diff --git a/src/tasks/sync-registries.js b/src/tasks/sync-registries.js index 2e164e72..86c53fea 100644 --- a/src/tasks/sync-registries.js +++ b/src/tasks/sync-registries.js @@ -22,7 +22,7 @@ import { generateGenerationIndex, } from '../utils/sync-migration-utils'; import { - processingUpdateTransactionMutex, + processingSyncRegistriesTransactionMutex, syncRegistriesTaskMutex, } from '../utils/model-utils.js'; @@ -89,11 +89,57 @@ const processJob = async () => { }); for (const organization of organizations) { - await syncOrganizationAudit(organization); + if (CONFIG.USE_SIMULATOR || process.env.NODE_ENV === 'test') { + await syncOrganizationAudit(organization); + } else { + const mostRecentOrgAuditRecord = await Audit.findOne({ + where: { + orgUid: organization.orgUid, + }, + order: [['createdAt', 'DESC']], + limit: 1, + raw: true, + }); + + // verify that the latest organization root hash is up to date with the audit records. attempt correction. + if (mostRecentOrgAuditRecord.rootHash !== organization.registryHash) { + logger.warn( + `latest root hash in org table for organization ${organization.name} (orgUid ${organization.orgUid}) does not match the audit records. attempting to correct`, + ); + try { + const result = await Organization.update( + { registryHash: mostRecentOrgAuditRecord.rootHash }, + { + where: { orgUid: organization.orgUid }, + }, + ); + + if (result?.length) { + logger.info( + `registry hash record corrected for ${organization.name} (orgUid ${organization.orgUid}). proceeding with audit sync`, + ); + const correctedOrganizationRecord = await Organization.findOne({ + where: { orgUid: organization.orgUid }, + }); + + await syncOrganizationAudit(correctedOrganizationRecord); + } else { + throw new Error('organizations update query affected 0 records'); + } + } catch (error) { + logger.error( + `failed to update organization table record for ${organization.name} (orgUid ${organization.orgUid}) with correct root hash. Something is wrong. Skipping audit sync and trying again shortly. Error: ${error}`, + ); + } + } else { + // normal state, proceed with audit sync + await syncOrganizationAudit(organization); + } + } } }; -async function createTransaction(callback, afterCommitCallbacks) { +async function updateAuditTransaction(callback, afterCommitCallbacks) { let result = null; let transaction; @@ -101,7 +147,7 @@ async function createTransaction(callback, afterCommitCallbacks) { logger.info('Starting sequelize transaction and acquiring transaction mutex'); const releaseTransactionMutex = - await processingUpdateTransactionMutex.acquire(); + await processingSyncRegistriesTransactionMutex.acquire(); try { // Start a transaction @@ -131,8 +177,9 @@ async function createTransaction(callback, afterCommitCallbacks) { } catch (error) { // Roll back the transaction if an error occurs if (transaction) { - logger.error('Rolling back transaction'); - console.error(error); + logger.error( + `encountered error syncing organization audit. Rolling back transaction. Error: ${error}`, + ); await transaction.rollback(); } } finally { @@ -439,7 +486,10 @@ const syncOrganizationAudit = async (organization) => { // by not processing the DELETE for that record. const optimizedKvDiff = optimizeAndSortKvDiff(kvDiff); - const updateTransaction = async (transaction, mirrorTransaction) => { + const createAndProcessTransaction = async ( + transaction, + mirrorTransaction, + ) => { logger.info( `Syncing ${organization.name} generation ${toBeProcessedDatalayerGenerationIndex} (orgUid ${organization.orgUid}, registryId ${organization.registryId})`, ); @@ -536,18 +586,8 @@ const syncOrganizationAudit = async (organization) => { } // Create the Audit record - logger.debug( - `creating audit model entry for ${organization.name} transacton`, - ); + logger.debug(`creating audit model transaction entry`); await Audit.create(auditData, { transaction, mirrorTransaction }); - await Organization.update( - { registryHash: rootToBeProcessed.root_hash }, - { - where: { orgUid: organization.orgUid }, - transaction, - mirrorTransaction, - }, - ); } } }; @@ -556,7 +596,27 @@ const syncOrganizationAudit = async (organization) => { afterCommitCallbacks.push(truncateStaging); } - await createTransaction(updateTransaction, afterCommitCallbacks); + const transactionSucceeded = await createAndProcessTransaction( + updateAuditTransaction, + afterCommitCallbacks, + ); + + if (transactionSucceeded) { + logger.debug( + `updateAuditTransaction successfully completed and committed audit updates for ${organization.name} (orgUid: ${organization.orgUid}, registryId: ${organization.registryId}) generation index ${toBeProcessedDatalayerGenerationIndex}. updating registry hash to ${rootToBeProcessed.root_hash}`, + ); + + await Organization.update( + { registryHash: rootToBeProcessed.root_hash }, + { + where: { orgUid: organization.orgUid }, + }, + ); + } else { + logger.debug( + `updateAuditTransaction failed to complete and commit audit updates for ${organization.name} (orgUid: ${organization.orgUid}, registryId: ${organization.registryId}) generation index ${toBeProcessedDatalayerGenerationIndex}`, + ); + } } catch (error) { logger.error('Error syncing org audit', error); } diff --git a/src/utils/model-utils.js b/src/utils/model-utils.js index 02decbd0..0d77887a 100644 --- a/src/utils/model-utils.js +++ b/src/utils/model-utils.js @@ -3,10 +3,11 @@ import Sequelize from 'sequelize'; import { Mutex } from 'async-mutex'; -export async function waitForSyncRegistries() { - if (processingUpdateTransactionMutex.isLocked()) { +export async function waitForSyncRegistriesTransaction() { + if (processingSyncRegistriesTransactionMutex.isLocked()) { // when the mutex is acquired, the current sync transaction has completed - const releaseMutex = await processingUpdateTransactionMutex.acquire(); + const releaseMutex = + await processingSyncRegistriesTransactionMutex.acquire(); await releaseMutex(); } } @@ -24,7 +25,7 @@ export const syncRegistriesTaskMutex = new Mutex(); * audit model update transactions are large and lock the DB for long periods. * @type {Mutex} */ -export const processingUpdateTransactionMutex = new Mutex(); +export const processingSyncRegistriesTransactionMutex = new Mutex(); export function formatModelAssociationName(model) { if (model == null || model.model == null) return '';