From 937fab002fc397565dad728521ba2a097cef4811 Mon Sep 17 00:00:00 2001 From: Djordje Kovacevic Date: Thu, 9 Nov 2023 12:47:11 +0100 Subject: [PATCH 1/4] Added service agreements operational db migration --- ot-node.js | 298 +++--------------- src/migration/base-migration.js | 24 ++ src/migration/migration-executor.js | 268 ++++++++++++++++ ...ervice-agreements-op-database-migration.js | 129 ++++++++ .../service-agreement-repository.js | 13 + .../repository/repository-module-manager.js | 8 + 6 files changed, 493 insertions(+), 247 deletions(-) create mode 100644 src/migration/migration-executor.js create mode 100644 src/migration/service-agreements-op-database-migration.js diff --git a/ot-node.js b/ot-node.js index 716669d52d..deff16483c 100644 --- a/ot-node.js +++ b/ot-node.js @@ -5,20 +5,11 @@ import { createRequire } from 'module'; import { execSync } from 'child_process'; import DependencyInjection from './src/service/dependency-injection.js'; import Logger from './src/logger/logger.js'; -import { MIN_NODE_VERSION, NODE_ENVIRONMENTS } from './src/constants/constants.js'; +import { MIN_NODE_VERSION } from './src/constants/constants.js'; import FileService from './src/service/file-service.js'; import OtnodeUpdateCommand from './src/commands/common/otnode-update-command.js'; import OtAutoUpdater from './src/modules/auto-updater/implementation/ot-auto-updater.js'; -import PullBlockchainShardingTableMigration from './src/migration/pull-sharding-table-migration.js'; -import TripleStoreUserConfigurationMigration from './src/migration/triple-store-user-configuration-migration.js'; -import TelemetryModuleUserConfigurationMigration from './src/migration/telemetry-module-user-configuration-migration.js'; -import PrivateAssetsMetadataMigration from './src/migration/private-assets-metadata-migration.js'; -import ServiceAgreementsMetadataMigration from './src/migration/service-agreements-metadata-migration.js'; -import RemoveAgreementStartEndTimeMigration from './src/migration/remove-agreement-start-end-time-migration.js'; -import MarkOldBlockchainEventsAsProcessedMigration from './src/migration/mark-old-blockchain-events-as-processed-migration.js'; -import TripleStoreMetadataMigration from './src/migration/triple-store-metadata-migration.js'; -import RemoveOldEpochCommandsMigration from './src/migration/remove-old-epoch-commands-migration.js'; -import PendingStorageMigration from './src/migration/pending-storage-migration.js'; +import MigrationExecutor from './src/migration/migration-executor.js'; const require = createRequire(import.meta.url); const pjson = require('./package.json'); @@ -36,8 +27,14 @@ class OTNode { async start() { await this.checkForUpdate(); await this.removeUpdateFile(); - await this.executeTripleStoreUserConfigurationMigration(); - await this.executeTelemetryModuleUserConfigurationMigration(); + await MigrationExecutor.executeTripleStoreUserConfigurationMigration( + this.logger, + this.config, + ); + await MigrationExecutor.executeTelemetryModuleUserConfigurationMigration( + this.logger, + this.config, + ); this.logger.info(' ██████╗ ████████╗███╗ ██╗ ██████╗ ██████╗ ███████╗'); this.logger.info('██╔═══██╗╚══██╔══╝████╗ ██║██╔═══██╗██╔══██╗██╔════╝'); this.logger.info('██║ ██║ ██║ ██╔██╗ ██║██║ ██║██║ ██║█████╗'); @@ -54,15 +51,47 @@ class OTNode { this.initializeEventEmitter(); await this.initializeModules(); - await this.executePullShardingTableMigration(); - await this.executePrivateAssetsMetadataMigration(); - await this.executeRemoveAgreementStartEndTimeMigration(); - await this.executeMarkOldBlockchainEventsAsProcessedMigration(); - await this.executeTripleStoreMetadataMigration(); - await this.executeServiceAgreementsMetadataMigration(); - await this.executeRemoveOldEpochCommandsMigration(); - await this.executePendingStorageMigration(); - + await MigrationExecutor.executePullShardingTableMigration( + this.container, + this.logger, + this.config, + ); + await MigrationExecutor.executePrivateAssetsMetadataMigration( + this.container, + this.logger, + this.config, + ); + await MigrationExecutor.executeRemoveAgreementStartEndTimeMigration( + this.container, + this.logger, + this.config, + ); + await MigrationExecutor.executeMarkOldBlockchainEventsAsProcessedMigration( + this.container, + this.logger, + this.config, + ); + await MigrationExecutor.executeTripleStoreMetadataMigration( + this.container, + this.logger, + this.config, + ); + await MigrationExecutor.executeServiceAgreementsMetadataMigration( + this.container, + this.logger, + this.config, + ); + await MigrationExecutor.executeRemoveOldEpochCommandsMigration( + this.container, + this.logger, + this.config, + ); + await MigrationExecutor.executePendingStorageMigration(this.logger, this.config); + await MigrationExecutor.executeServiceAgreementsOpDatabaseMigration( + this.container, + this.logger, + this.config, + ); await this.createProfiles(); await this.initializeCommandExecutor(); @@ -292,211 +321,6 @@ class OTNode { }); } - async executePrivateAssetsMetadataMigration() { - if ( - process.env.NODE_ENV === NODE_ENVIRONMENTS.DEVELOPMENT || - process.env.NODE_ENV === NODE_ENVIRONMENTS.TEST - ) - return; - const blockchainModuleManager = this.container.resolve('blockchainModuleManager'); - const tripleStoreService = this.container.resolve('tripleStoreService'); - const serviceAgreementService = this.container.resolve('serviceAgreementService'); - const ualService = this.container.resolve('ualService'); - const dataService = this.container.resolve('dataService'); - - const migration = new PrivateAssetsMetadataMigration( - 'privateAssetsMetadataMigration', - this.logger, - this.config, - tripleStoreService, - blockchainModuleManager, - serviceAgreementService, - ualService, - dataService, - ); - - if (!(await migration.migrationAlreadyExecuted())) { - await migration.migrate(); - this.logger.info('Node will now restart!'); - this.stop(1); - } - } - - async executeTelemetryModuleUserConfigurationMigration() { - if ( - process.env.NODE_ENV === NODE_ENVIRONMENTS.DEVELOPMENT || - process.env.NODE_ENV === NODE_ENVIRONMENTS.TEST - ) - return; - - const migration = new TelemetryModuleUserConfigurationMigration( - 'telemetryModuleUserConfigurationMigration', - this.logger, - this.config, - ); - if (!(await migration.migrationAlreadyExecuted())) { - await migration.migrate(); - this.logger.info('Node will now restart!'); - this.stop(1); - } - } - - async executeTripleStoreUserConfigurationMigration() { - if ( - process.env.NODE_ENV === NODE_ENVIRONMENTS.DEVELOPMENT || - process.env.NODE_ENV === NODE_ENVIRONMENTS.TEST - ) - return; - - const migration = new TripleStoreUserConfigurationMigration( - 'tripleStoreUserConfigurationMigration', - this.logger, - this.config, - ); - if (!(await migration.migrationAlreadyExecuted())) { - await migration.migrate(); - this.logger.info('Node will now restart!'); - this.stop(1); - } - } - - async executePullShardingTableMigration() { - if ( - process.env.NODE_ENV === NODE_ENVIRONMENTS.DEVELOPMENT || - process.env.NODE_ENV === NODE_ENVIRONMENTS.TEST - ) - return; - - const blockchainModuleManager = this.container.resolve('blockchainModuleManager'); - const repositoryModuleManager = this.container.resolve('repositoryModuleManager'); - const validationModuleManager = this.container.resolve('validationModuleManager'); - - const migration = new PullBlockchainShardingTableMigration( - 'pullShardingTableMigrationV612', - this.logger, - this.config, - repositoryModuleManager, - blockchainModuleManager, - validationModuleManager, - ); - if (!(await migration.migrationAlreadyExecuted())) { - await migration.migrate(); - } - } - - async executeServiceAgreementsMetadataMigration() { - if ( - process.env.NODE_ENV === NODE_ENVIRONMENTS.DEVELOPMENT || - process.env.NODE_ENV === NODE_ENVIRONMENTS.TEST - ) - return; - - const blockchainModuleManager = this.container.resolve('blockchainModuleManager'); - const repositoryModuleManager = this.container.resolve('repositoryModuleManager'); - const tripleStoreService = this.container.resolve('tripleStoreService'); - const serviceAgreementService = this.container.resolve('serviceAgreementService'); - const ualService = this.container.resolve('ualService'); - - const migration = new ServiceAgreementsMetadataMigration( - 'serviceAgreementsMetadataMigration', - this.logger, - this.config, - tripleStoreService, - blockchainModuleManager, - repositoryModuleManager, - serviceAgreementService, - ualService, - ); - if (!(await migration.migrationAlreadyExecuted())) { - await migration.migrate(); - } - } - - async executeRemoveAgreementStartEndTimeMigration() { - if ( - process.env.NODE_ENV === NODE_ENVIRONMENTS.DEVELOPMENT || - process.env.NODE_ENV === NODE_ENVIRONMENTS.TEST - ) - return; - - const tripleStoreService = this.container.resolve('tripleStoreService'); - - const migration = new RemoveAgreementStartEndTimeMigration( - 'removeAgreementStartEndTimeMigration', - this.logger, - this.config, - tripleStoreService, - ); - if (!(await migration.migrationAlreadyExecuted())) { - await migration.migrate(); - } - } - - async executeTripleStoreMetadataMigration() { - if ( - process.env.NODE_ENV === NODE_ENVIRONMENTS.DEVELOPMENT || - process.env.NODE_ENV === NODE_ENVIRONMENTS.TEST - ) - return; - const blockchainModuleManager = this.container.resolve('blockchainModuleManager'); - const tripleStoreService = this.container.resolve('tripleStoreService'); - const serviceAgreementService = this.container.resolve('serviceAgreementService'); - const ualService = this.container.resolve('ualService'); - const dataService = this.container.resolve('dataService'); - - const migration = new TripleStoreMetadataMigration( - 'tripleStoreMetadataMigration', - this.logger, - this.config, - tripleStoreService, - blockchainModuleManager, - serviceAgreementService, - ualService, - dataService, - ); - - if (!(await migration.migrationAlreadyExecuted())) { - await migration.migrate(); - } - } - - async executeRemoveOldEpochCommandsMigration() { - if ( - process.env.NODE_ENV === NODE_ENVIRONMENTS.DEVELOPMENT || - process.env.NODE_ENV === NODE_ENVIRONMENTS.TEST - ) - return; - - const repositoryModuleManager = this.container.resolve('repositoryModuleManager'); - - const migration = new RemoveOldEpochCommandsMigration( - 'removeOldEpochCommandsMigration', - this.logger, - this.config, - repositoryModuleManager, - ); - if (!(await migration.migrationAlreadyExecuted())) { - await migration.migrate(); - } - } - - async executePendingStorageMigration() { - if ( - process.env.NODE_ENV === NODE_ENVIRONMENTS.DEVELOPMENT || - process.env.NODE_ENV === NODE_ENVIRONMENTS.TEST - ) - return; - - const migration = new PendingStorageMigration( - 'pendingStorageMigration', - this.logger, - this.config, - ); - if (!(await migration.migrationAlreadyExecuted())) { - await migration.migrate(); - } - } - async initializeShardingTableService() { try { const shardingTableService = this.container.resolve('shardingTableService'); @@ -533,26 +357,6 @@ class OTNode { this.logger.info('Stopping node...'); process.exit(code); } - - async executeMarkOldBlockchainEventsAsProcessedMigration() { - if ( - process.env.NODE_ENV === NODE_ENVIRONMENTS.DEVELOPMENT || - process.env.NODE_ENV === NODE_ENVIRONMENTS.TEST - ) - return; - - const repositoryModuleManager = this.container.resolve('repositoryModuleManager'); - - const migration = new MarkOldBlockchainEventsAsProcessedMigration( - 'markOldBlockchainEventsAsProcessedMigration', - this.logger, - this.config, - repositoryModuleManager, - ); - if (!(await migration.migrationAlreadyExecuted())) { - await migration.migrate(); - } - } } export default OTNode; diff --git a/src/migration/base-migration.js b/src/migration/base-migration.js index 0b17fa6577..afa94b65b6 100644 --- a/src/migration/base-migration.js +++ b/src/migration/base-migration.js @@ -52,6 +52,30 @@ class BaseMigration { ); } + async getMigrationInfo() { + const migrationFolderPath = this.fileService.getMigrationFolderPath(); + const migrationInfoFileName = `${this.migrationName}_info`; + const migrationInfoPath = path.join(migrationFolderPath, migrationInfoFileName); + let migrationInfo = null; + if (await this.fileService.pathExists(migrationInfoPath)) { + migrationInfo = await this.fileService + .readFile(migrationInfoPath, true) + .catch(() => {}); + } + return migrationInfo; + } + + async saveMigrationInfo(migrationInfo) { + const migrationFolderPath = this.fileService.getMigrationFolderPath(); + const migrationInfoFileName = `${this.migrationName}_info`; + await this.fileService.writeContentsToFile( + migrationFolderPath, + migrationInfoFileName, + JSON.stringify(migrationInfo), + false, + ); + } + async executeMigration() { throw Error('Execute migration method not implemented'); } diff --git a/src/migration/migration-executor.js b/src/migration/migration-executor.js new file mode 100644 index 0000000000..aed10123e0 --- /dev/null +++ b/src/migration/migration-executor.js @@ -0,0 +1,268 @@ +import { NODE_ENVIRONMENTS } from '../constants/constants.js'; +import PullBlockchainShardingTableMigration from './pull-sharding-table-migration.js'; +import PrivateAssetsMetadataMigration from './private-assets-metadata-migration.js'; +import TelemetryModuleUserConfigurationMigration from './telemetry-module-user-configuration-migration.js'; +import TripleStoreUserConfigurationMigration from './triple-store-user-configuration-migration.js'; +import ServiceAgreementsMetadataMigration from './service-agreements-metadata-migration.js'; +import RemoveAgreementStartEndTimeMigration from './remove-agreement-start-end-time-migration.js'; +import TripleStoreMetadataMigration from './triple-store-metadata-migration.js'; +import RemoveOldEpochCommandsMigration from './remove-old-epoch-commands-migration.js'; +import PendingStorageMigration from './pending-storage-migration.js'; +import MarkOldBlockchainEventsAsProcessedMigration from './mark-old-blockchain-events-as-processed-migration.js'; +import ServiceAgreementsOpDatabaseMigration from './service-agreements-op-database-migration.js'; + +class MigrationExecutor { + static async executePullShardingTableMigration(container, logger, config) { + if ( + process.env.NODE_ENV === NODE_ENVIRONMENTS.DEVELOPMENT || + process.env.NODE_ENV === NODE_ENVIRONMENTS.TEST + ) + return; + + const blockchainModuleManager = container.resolve('blockchainModuleManager'); + const repositoryModuleManager = container.resolve('repositoryModuleManager'); + const validationModuleManager = container.resolve('validationModuleManager'); + + const migration = new PullBlockchainShardingTableMigration( + 'pullShardingTableMigrationV612', + logger, + config, + repositoryModuleManager, + blockchainModuleManager, + validationModuleManager, + ); + if (!(await migration.migrationAlreadyExecuted())) { + await migration.migrate(); + } + } + + static async executePrivateAssetsMetadataMigration(container, logger, config) { + if ( + process.env.NODE_ENV === NODE_ENVIRONMENTS.DEVELOPMENT || + process.env.NODE_ENV === NODE_ENVIRONMENTS.TEST + ) + return; + const blockchainModuleManager = container.resolve('blockchainModuleManager'); + const tripleStoreService = container.resolve('tripleStoreService'); + const serviceAgreementService = container.resolve('serviceAgreementService'); + const ualService = container.resolve('ualService'); + const dataService = container.resolve('dataService'); + + const migration = new PrivateAssetsMetadataMigration( + 'privateAssetsMetadataMigration', + logger, + config, + tripleStoreService, + blockchainModuleManager, + serviceAgreementService, + ualService, + dataService, + ); + + if (!(await migration.migrationAlreadyExecuted())) { + await migration.migrate(); + logger.info('Node will now restart!'); + MigrationExecutor.exitNode(1); + } + } + + static async executeTelemetryModuleUserConfigurationMigration(logger, config) { + if ( + process.env.NODE_ENV === NODE_ENVIRONMENTS.DEVELOPMENT || + process.env.NODE_ENV === NODE_ENVIRONMENTS.TEST + ) + return; + + const migration = new TelemetryModuleUserConfigurationMigration( + 'telemetryModuleUserConfigurationMigration', + logger, + config, + ); + if (!(await migration.migrationAlreadyExecuted())) { + await migration.migrate(); + logger.info('Node will now restart!'); + MigrationExecutor.exitNode(1); + } + } + + static async executeTripleStoreUserConfigurationMigration(logger, config) { + if ( + process.env.NODE_ENV === NODE_ENVIRONMENTS.DEVELOPMENT || + process.env.NODE_ENV === NODE_ENVIRONMENTS.TEST + ) + return; + + const migration = new TripleStoreUserConfigurationMigration( + 'tripleStoreUserConfigurationMigration', + logger, + config, + ); + if (!(await migration.migrationAlreadyExecuted())) { + await migration.migrate(); + logger.info('Node will now restart!'); + MigrationExecutor.exitNode(1); + } + } + + static async executeServiceAgreementsMetadataMigration(container, logger, config) { + if ( + process.env.NODE_ENV === NODE_ENVIRONMENTS.DEVELOPMENT || + process.env.NODE_ENV === NODE_ENVIRONMENTS.TEST + ) + return; + + const blockchainModuleManager = container.resolve('blockchainModuleManager'); + const repositoryModuleManager = container.resolve('repositoryModuleManager'); + const tripleStoreService = container.resolve('tripleStoreService'); + const serviceAgreementService = container.resolve('serviceAgreementService'); + const ualService = container.resolve('ualService'); + + const migration = new ServiceAgreementsMetadataMigration( + 'serviceAgreementsMetadataMigration', + logger, + config, + tripleStoreService, + blockchainModuleManager, + repositoryModuleManager, + serviceAgreementService, + ualService, + ); + if (!(await migration.migrationAlreadyExecuted())) { + await migration.migrate(); + } + } + + static async executeRemoveAgreementStartEndTimeMigration(container, logger, config) { + if ( + process.env.NODE_ENV === NODE_ENVIRONMENTS.DEVELOPMENT || + process.env.NODE_ENV === NODE_ENVIRONMENTS.TEST + ) + return; + + const tripleStoreService = container.resolve('tripleStoreService'); + + const migration = new RemoveAgreementStartEndTimeMigration( + 'removeAgreementStartEndTimeMigration', + logger, + config, + tripleStoreService, + ); + if (!(await migration.migrationAlreadyExecuted())) { + await migration.migrate(); + } + } + + static async executeTripleStoreMetadataMigration(container, logger, config) { + if ( + process.env.NODE_ENV === NODE_ENVIRONMENTS.DEVELOPMENT || + process.env.NODE_ENV === NODE_ENVIRONMENTS.TEST + ) + return; + const blockchainModuleManager = container.resolve('blockchainModuleManager'); + const tripleStoreService = container.resolve('tripleStoreService'); + const serviceAgreementService = container.resolve('serviceAgreementService'); + const ualService = container.resolve('ualService'); + const dataService = container.resolve('dataService'); + + const migration = new TripleStoreMetadataMigration( + 'tripleStoreMetadataMigration', + logger, + config, + tripleStoreService, + blockchainModuleManager, + serviceAgreementService, + ualService, + dataService, + ); + + if (!(await migration.migrationAlreadyExecuted())) { + await migration.migrate(); + } + } + + static async executeRemoveOldEpochCommandsMigration(container, logger, config) { + if ( + process.env.NODE_ENV === NODE_ENVIRONMENTS.DEVELOPMENT || + process.env.NODE_ENV === NODE_ENVIRONMENTS.TEST + ) + return; + + const repositoryModuleManager = container.resolve('repositoryModuleManager'); + + const migration = new RemoveOldEpochCommandsMigration( + 'removeOldEpochCommandsMigration', + logger, + config, + repositoryModuleManager, + ); + if (!(await migration.migrationAlreadyExecuted())) { + await migration.migrate(); + } + } + + static async executePendingStorageMigration(logger, config) { + if ( + process.env.NODE_ENV === NODE_ENVIRONMENTS.DEVELOPMENT || + process.env.NODE_ENV === NODE_ENVIRONMENTS.TEST + ) + return; + + const migration = new PendingStorageMigration('pendingStorageMigration', logger, config); + if (!(await migration.migrationAlreadyExecuted())) { + await migration.migrate(); + } + } + + static async executeMarkOldBlockchainEventsAsProcessedMigration(container, logger, config) { + if ( + process.env.NODE_ENV === NODE_ENVIRONMENTS.DEVELOPMENT || + process.env.NODE_ENV === NODE_ENVIRONMENTS.TEST + ) + return; + + const repositoryModuleManager = container.resolve('repositoryModuleManager'); + + const migration = new MarkOldBlockchainEventsAsProcessedMigration( + 'markOldBlockchainEventsAsProcessedMigration', + logger, + config, + repositoryModuleManager, + ); + if (!(await migration.migrationAlreadyExecuted())) { + await migration.migrate(); + } + } + + static async executeServiceAgreementsOpDatabaseMigration(container, logger, config) { + // todo should we also exclude testnet? + if ( + process.env.NODE_ENV === NODE_ENVIRONMENTS.DEVELOPMENT || + process.env.NODE_ENV === NODE_ENVIRONMENTS.TEST + ) + return; + + const blockchainModuleManager = container.resolve('blockchainModuleManager'); + const repositoryModuleManager = container.resolve('repositoryModuleManager'); + const serviceAgreementService = container.resolve('serviceAgreementService'); + const ualService = container.resolve('ualService'); + + const migration = new ServiceAgreementsOpDatabaseMigration( + 'serviceAgreementsOpDatabaseMigration', + logger, + config, + blockchainModuleManager, + repositoryModuleManager, + serviceAgreementService, + ualService, + ); + if (!(await migration.migrationAlreadyExecuted())) { + await migration.migrate(); + } + } + + static exitNode(code = 0) { + process.exit(code); + } +} + +export default MigrationExecutor; diff --git a/src/migration/service-agreements-op-database-migration.js b/src/migration/service-agreements-op-database-migration.js new file mode 100644 index 0000000000..5e678920c8 --- /dev/null +++ b/src/migration/service-agreements-op-database-migration.js @@ -0,0 +1,129 @@ +/* eslint-disable no-await-in-loop */ +import BaseMigration from './base-migration.js'; + +class ServiceAgreementsOpDatabaseMigration extends BaseMigration { + constructor( + migrationName, + logger, + config, + blockchainModuleManager, + repositoryModuleManager, + serviceAgreementService, + ualService, + ) { + super(migrationName, logger, config); + this.blockchainModuleManager = blockchainModuleManager; + this.repositoryModuleManager = repositoryModuleManager; + this.serviceAgreementService = serviceAgreementService; + this.ualService = ualService; + } + + async executeMigration() { + let migrationInfo = await this.getMigrationInfo(); + if (!migrationInfo?.lastProcessedTokenId) { + migrationInfo = { + lastProcessedTokenId: 0, + }; + } + + const numberOfActiveServiceAgreements = + await this.repositoryModuleManager.getNumberOfActiveServiceAgreements(); + let processed = 0; + const batchSize = + numberOfActiveServiceAgreements > 10000 ? 10000 : numberOfActiveServiceAgreements; + const concurrency = 3; + + while (processed < numberOfActiveServiceAgreements) { + const serviceAgreementsToProcess = + await this.repositoryModuleManager.getServiceAgreements( + migrationInfo.lastProcessedTokenId, + batchSize, + ); + + let promises = []; + + for (const serviceAgreement of serviceAgreementsToProcess) { + promises.push(this.processServiceAgreement(serviceAgreement)); + + if (promises.length >= concurrency) { + // eslint-disable-next-line no-await-in-loop + await Promise.all(promises); + promises = []; + migrationInfo.lastProcessedTokenId = + serviceAgreementsToProcess.slice(-1)[0].tokenId; + await this.saveMigrationInfo(migrationInfo); + this.logger.trace( + `${this.migrationName} Last token id processed ${migrationInfo.lastProcessedTokenId}.`, + ); + } + } + + processed += batchSize; + } + } + + async processServiceAgreement(serviceAgreement) { + const updatedServiceAgreement = serviceAgreement; + let updated = false; + const keyword = await this.ualService.calculateLocationKeyword( + updatedServiceAgreement.blockchainId, + updatedServiceAgreement.assetStorageContractAddress, + updatedServiceAgreement.tokenId, + ); + + if (serviceAgreement.keyword !== keyword) { + updatedServiceAgreement.keyword = keyword; + updated = true; + } + + const agreementId = await this.serviceAgreementService.generateId( + updatedServiceAgreement.blockchainId, + updatedServiceAgreement.assetStorageContractAddress, + updatedServiceAgreement.tokenId, + keyword, + updatedServiceAgreement.hashFunctionId, + ); + + if (serviceAgreement.agreementId !== agreementId) { + updatedServiceAgreement.agreementId = agreementId; + updated = true; + } + + const assertionIds = this.blockchainModuleManager.getAssertionIds( + serviceAgreement.assetStorageContractAddress, + serviceAgreement.tokenId, + ); + const stateIndex = assertionIds.length - 1; + + if (updatedServiceAgreement.assertionId !== assertionIds[stateIndex]) { + updatedServiceAgreement.assertionId = assertionIds[stateIndex]; + updated = true; + } + + if (updatedServiceAgreement.stateIndex !== stateIndex) { + updatedServiceAgreement.stateIndex = stateIndex; + updated = true; + } + if (updated) { + await this.repositoryModuleManager.updateServiceAgreementRecord( + updatedServiceAgreement.blockchainId, + updatedServiceAgreement.assetStorageContractAddress, + updatedServiceAgreement.tokenId, + updatedServiceAgreement.agreementId, + updatedServiceAgreement.startTime, + updatedServiceAgreement.epochsNumber, + updatedServiceAgreement.epochLength, + updatedServiceAgreement.scoreFunctionId, + updatedServiceAgreement.proofWindowOffsetPerc, + updatedServiceAgreement.hashFunctionId, + updatedServiceAgreement.keyword, + updatedServiceAgreement.assertionId, + updatedServiceAgreement.stateIndex, + updatedServiceAgreement.lastCommitEpoch, + updatedServiceAgreement.lastProofEpoch, + ); + } + } +} + +export default ServiceAgreementsOpDatabaseMigration; diff --git a/src/modules/repository/implementation/sequelize/repositories/service-agreement-repository.js b/src/modules/repository/implementation/sequelize/repositories/service-agreement-repository.js index fdca41bf7c..f077a44247 100644 --- a/src/modules/repository/implementation/sequelize/repositories/service-agreement-repository.js +++ b/src/modules/repository/implementation/sequelize/repositories/service-agreement-repository.js @@ -202,6 +202,19 @@ class ServiceAgreementRepository { raw: true, }); } + + async getNumberOfActiveServiceAgreements() { + return this.model.count(); + } + + async getServiceAgreements(fromTokenId, batchSize) { + return this.model.findAll({ + where: { + tokenId: { [Sequelize.Op.gt]: fromTokenId }, + }, + limit: batchSize, + }); + } } export default ServiceAgreementRepository; diff --git a/src/modules/repository/repository-module-manager.js b/src/modules/repository/repository-module-manager.js index 74f832901b..a45af46c1a 100644 --- a/src/modules/repository/repository-module-manager.js +++ b/src/modules/repository/repository-module-manager.js @@ -405,6 +405,14 @@ class RepositoryModuleManager extends BaseModuleManager { epochsNumber, ); } + + async getNumberOfActiveServiceAgreements() { + return this.getRepository('service_agreement').getNumberOfActiveServiceAgreements(); + } + + async getServiceAgreements(fromTokenId, batchSize) { + return this.getRepository('service_agreement').getServiceAgreements(fromTokenId, batchSize); + } } export default RepositoryModuleManager; From 19a942269dfd98f84618bd864c308bdb7eb49db6 Mon Sep 17 00:00:00 2001 From: Djordje Kovacevic Date: Fri, 10 Nov 2023 11:27:47 +0100 Subject: [PATCH 2/4] Update migration --- ...ervice-agreements-op-database-migration.js | 74 ++++++++++--------- .../service-agreement-repository.js | 22 +++++- .../repository/repository-module-manager.js | 12 +++ 3 files changed, 72 insertions(+), 36 deletions(-) diff --git a/src/migration/service-agreements-op-database-migration.js b/src/migration/service-agreements-op-database-migration.js index 5e678920c8..d5dd752a5b 100644 --- a/src/migration/service-agreements-op-database-migration.js +++ b/src/migration/service-agreements-op-database-migration.js @@ -1,6 +1,10 @@ /* eslint-disable no-await-in-loop */ import BaseMigration from './base-migration.js'; +let wrongAgreementsCount = 0; +const MAX_BATCH_SIZE = 10000; +const CONCURRENCY = 3; + class ServiceAgreementsOpDatabaseMigration extends BaseMigration { constructor( migrationName, @@ -30,8 +34,9 @@ class ServiceAgreementsOpDatabaseMigration extends BaseMigration { await this.repositoryModuleManager.getNumberOfActiveServiceAgreements(); let processed = 0; const batchSize = - numberOfActiveServiceAgreements > 10000 ? 10000 : numberOfActiveServiceAgreements; - const concurrency = 3; + numberOfActiveServiceAgreements > MAX_BATCH_SIZE + ? MAX_BATCH_SIZE + : numberOfActiveServiceAgreements; while (processed < numberOfActiveServiceAgreements) { const serviceAgreementsToProcess = @@ -39,15 +44,22 @@ class ServiceAgreementsOpDatabaseMigration extends BaseMigration { migrationInfo.lastProcessedTokenId, batchSize, ); - let promises = []; for (const serviceAgreement of serviceAgreementsToProcess) { promises.push(this.processServiceAgreement(serviceAgreement)); - if (promises.length >= concurrency) { - // eslint-disable-next-line no-await-in-loop - await Promise.all(promises); + if ( + promises.length >= CONCURRENCY || + promises.length === serviceAgreementsToProcess.length + ) { + try { + await Promise.all(promises); + } catch (error) { + this.logger.warn( + `Unable to process invalid service agreements. Error: ${error}`, + ); + } promises = []; migrationInfo.lastProcessedTokenId = serviceAgreementsToProcess.slice(-1)[0].tokenId; @@ -60,15 +72,19 @@ class ServiceAgreementsOpDatabaseMigration extends BaseMigration { processed += batchSize; } + + this.logger.trace( + `${this.migrationName} Total number of processed agreements ${processed}. Found invalid agreements: ${wrongAgreementsCount}`, + ); } async processServiceAgreement(serviceAgreement) { - const updatedServiceAgreement = serviceAgreement; + const updatedServiceAgreement = {}; let updated = false; const keyword = await this.ualService.calculateLocationKeyword( - updatedServiceAgreement.blockchainId, - updatedServiceAgreement.assetStorageContractAddress, - updatedServiceAgreement.tokenId, + serviceAgreement.blockchainId, + serviceAgreement.assetStorageContractAddress, + serviceAgreement.tokenId, ); if (serviceAgreement.keyword !== keyword) { @@ -77,11 +93,11 @@ class ServiceAgreementsOpDatabaseMigration extends BaseMigration { } const agreementId = await this.serviceAgreementService.generateId( - updatedServiceAgreement.blockchainId, - updatedServiceAgreement.assetStorageContractAddress, - updatedServiceAgreement.tokenId, + serviceAgreement.blockchainId, + serviceAgreement.assetStorageContractAddress, + serviceAgreement.tokenId, keyword, - updatedServiceAgreement.hashFunctionId, + serviceAgreement.hashFunctionId, ); if (serviceAgreement.agreementId !== agreementId) { @@ -89,39 +105,31 @@ class ServiceAgreementsOpDatabaseMigration extends BaseMigration { updated = true; } - const assertionIds = this.blockchainModuleManager.getAssertionIds( + const assertionIds = await this.blockchainModuleManager.getAssertionIds( + serviceAgreement.blockchain, serviceAgreement.assetStorageContractAddress, serviceAgreement.tokenId, ); const stateIndex = assertionIds.length - 1; - if (updatedServiceAgreement.assertionId !== assertionIds[stateIndex]) { + if (serviceAgreement.assertionId !== assertionIds[stateIndex]) { updatedServiceAgreement.assertionId = assertionIds[stateIndex]; updated = true; } - if (updatedServiceAgreement.stateIndex !== stateIndex) { + if (serviceAgreement.stateIndex !== stateIndex) { updatedServiceAgreement.stateIndex = stateIndex; updated = true; } if (updated) { - await this.repositoryModuleManager.updateServiceAgreementRecord( - updatedServiceAgreement.blockchainId, - updatedServiceAgreement.assetStorageContractAddress, - updatedServiceAgreement.tokenId, - updatedServiceAgreement.agreementId, - updatedServiceAgreement.startTime, - updatedServiceAgreement.epochsNumber, - updatedServiceAgreement.epochLength, - updatedServiceAgreement.scoreFunctionId, - updatedServiceAgreement.proofWindowOffsetPerc, - updatedServiceAgreement.hashFunctionId, - updatedServiceAgreement.keyword, - updatedServiceAgreement.assertionId, - updatedServiceAgreement.stateIndex, - updatedServiceAgreement.lastCommitEpoch, - updatedServiceAgreement.lastProofEpoch, + await this.repositoryModuleManager.updateServiceAgreementForTokenId( + serviceAgreement.tokenId, + updatedServiceAgreement.agreementId ?? serviceAgreement.agreementId, + updatedServiceAgreement.keyword ?? serviceAgreement.keyword, + updatedServiceAgreement.assertionId ?? serviceAgreement.assertionId, + updatedServiceAgreement.stateIndex ?? serviceAgreement.stateIndex, ); + wrongAgreementsCount += 1; } } } diff --git a/src/modules/repository/implementation/sequelize/repositories/service-agreement-repository.js b/src/modules/repository/implementation/sequelize/repositories/service-agreement-repository.js index f077a44247..d0cfc6e50d 100644 --- a/src/modules/repository/implementation/sequelize/repositories/service-agreement-repository.js +++ b/src/modules/repository/implementation/sequelize/repositories/service-agreement-repository.js @@ -23,7 +23,7 @@ class ServiceAgreementRepository { async updateServiceAgreementRecord( blockchainId, - contract, + assetStorageContractAddress, tokenId, agreementId, startTime, @@ -40,7 +40,7 @@ class ServiceAgreementRepository { ) { return this.model.upsert({ blockchainId, - assetStorageContractAddress: contract, + assetStorageContractAddress, tokenId, agreementId, startTime, @@ -57,6 +57,22 @@ class ServiceAgreementRepository { }); } + async updateServiceAgreementForTokenId(tokenId, agreementId, keyword, assertionId, stateIndex) { + return this.model.update( + { + agreementId, + keyword, + assertionId, + stateIndex, + }, + { + where: { + tokenId, + }, + }, + ); + } + async bulkCreateServiceAgreementRecords(serviceAgreements) { return this.model.bulkCreate(serviceAgreements, { validate: true, @@ -210,7 +226,7 @@ class ServiceAgreementRepository { async getServiceAgreements(fromTokenId, batchSize) { return this.model.findAll({ where: { - tokenId: { [Sequelize.Op.gt]: fromTokenId }, + tokenId: { [Sequelize.Op.gte]: fromTokenId }, }, limit: batchSize, }); diff --git a/src/modules/repository/repository-module-manager.js b/src/modules/repository/repository-module-manager.js index a45af46c1a..5cf4a83153 100644 --- a/src/modules/repository/repository-module-manager.js +++ b/src/modules/repository/repository-module-manager.js @@ -325,6 +325,18 @@ class RepositoryModuleManager extends BaseModuleManager { } } + async updateServiceAgreementForTokenId(tokenId, agreementId, keyword, assertionId, stateIndex) { + if (this.initialized) { + return this.getRepository('service_agreement').updateServiceAgreementForTokenId( + tokenId, + agreementId, + keyword, + assertionId, + stateIndex, + ); + } + } + async bulkCreateServiceAgreementRecords(records) { if (this.initialized) { return this.getRepository('service_agreement').bulkCreateServiceAgreementRecords( From 6a849897db55c2e63cf6a8676ae2472f810b2e68 Mon Sep 17 00:00:00 2001 From: Djordje Kovacevic Date: Tue, 14 Nov 2023 10:37:55 +0100 Subject: [PATCH 3/4] Resovled issue with failing bdd tests --- .../http-api/v0/get-http-api-controller-v0.js | 4 +- .../blockchain/implementation/web3-service.js | 48 +++++++------------ src/service/file-service.js | 3 +- 3 files changed, 19 insertions(+), 36 deletions(-) diff --git a/src/controllers/http-api/v0/get-http-api-controller-v0.js b/src/controllers/http-api/v0/get-http-api-controller-v0.js index b14dbc5e17..f743fb48a6 100644 --- a/src/controllers/http-api/v0/get-http-api-controller-v0.js +++ b/src/controllers/http-api/v0/get-http-api-controller-v0.js @@ -87,9 +87,7 @@ class GetController extends BaseController { OPERATION_ID_STATUS.GET.GET_INIT_END, ); } catch (error) { - this.logger.error( - `Error while initializing get data: ${error.message}. ${error.stack}`, - ); + this.logger.error(`Error while initializing get data: ${error.message}.`); await this.operationService.markOperationAsFailed( operationId, diff --git a/src/modules/blockchain/implementation/web3-service.js b/src/modules/blockchain/implementation/web3-service.js index 5f7478ec2f..c53d56d2cd 100644 --- a/src/modules/blockchain/implementation/web3-service.js +++ b/src/modules/blockchain/implementation/web3-service.js @@ -215,12 +215,18 @@ class Web3Service { let message = `${functionName}(${inputs}) ${method} has been successfully executed; `; if (info.backend.result !== null && method !== 'estimateGas') { - const decodedResultData = this._decodeResultData( - inputData.slice(0, 10), - info.backend.result, - contractInstance.interface, - ); - message += `Result: ${decodedResultData}; `; + try { + const decodedResultData = this._decodeResultData( + inputData.slice(0, 10), + info.backend.result, + contractInstance.interface, + ); + message += `Result: ${decodedResultData}; `; + } catch (error) { + this.logger.warn( + `Unable to decode result data for. Message: ${message}`, + ); + } } message += `RPC: ${info.backend.provider.connection.url}.`; @@ -409,9 +415,8 @@ class Web3Service { (input, i) => `${input.name}=${args[i]}`, ); - // eslint-disable-next-line no-await-in-loop - await this.handleError( - Error(`Call ${functionName}(${inputs}) failed, reason: ${decodedErrorData}`), + throw new Error( + `Call ${functionName}(${inputs}) failed, reason: ${decodedErrorData}`, ); } } @@ -440,10 +445,8 @@ class Web3Service { (input, i) => `${input.name}=${args[i]}`, ); - await this.handleError( - Error( - `Gas estimation ${functionName}(${inputs}) failed, reason: ${decodedErrorData}`, - ), + throw new Error( + `Gas estimation ${functionName}(${inputs}) failed, reason: ${decodedErrorData}`, ); } @@ -479,10 +482,7 @@ class Web3Service { gasPrice = Math.ceil(gasPrice * 1.2); transactionRetried = true; } else { - await this.handleError( - Error(`Transaction reverted, reason: ${decodedErrorData}`), - functionName, - ); + throw new Error(`Transaction reverted, reason: ${decodedErrorData}`); } } } @@ -1023,20 +1023,6 @@ class Web3Service { await this.initializeContracts(); } - async handleError(error, functionName) { - let isRpcError = false; - try { - await this.provider.getNetwork(); - } catch (rpcError) { - isRpcError = true; - this.logger.warn( - `Unable to execute smart contract function ${functionName} using Fallback RPC Provider.`, - ); - await this.restartService(); - } - if (!isRpcError) throw error; - } - async getUpdateCommitWindowDuration() { const commitWindowDurationPerc = await this.callContractFunction( this.ParametersStorageContract, diff --git a/src/service/file-service.js b/src/service/file-service.js index 68096b91f6..7d92ce8a84 100644 --- a/src/service/file-service.js +++ b/src/service/file-service.js @@ -90,8 +90,7 @@ class FileService { } async removeFolder(folderPath) { - this.logger.trace(`Removing folder at path: ${folderPath}`); - + this.logger.debug(`Removing folder at path: ${folderPath}`); try { await rm(folderPath, { recursive: true }); return true; From e81bc805ad3d2ab31c84b5754788fe998b55ef9a Mon Sep 17 00:00:00 2001 From: Djordje Kovacevic Date: Tue, 14 Nov 2023 10:52:54 +0100 Subject: [PATCH 4/4] PR comments resolved, disabled revert to older version on error --- index.js | 44 +++++++++---------- .../service-agreement-repository.js | 1 + 2 files changed, 21 insertions(+), 24 deletions(-) diff --git a/index.js b/index.js index 7a6cda7eef..fdc26099db 100644 --- a/index.js +++ b/index.js @@ -1,10 +1,6 @@ /* eslint-disable no-console */ import 'dotenv/config'; import fs from 'fs-extra'; -import path from 'path'; -import appRootPath from 'app-root-path'; -import { execSync } from 'child_process'; -import semver from 'semver'; import OTNode from './ot-node.js'; import { NODE_ENVIRONMENTS } from './src/constants/constants.js'; @@ -30,26 +26,26 @@ process.env.NODE_ENV = await node.start(); } catch (e) { console.error(`Error occurred while start ot-node, error message: ${e}. ${e.stack}`); - console.error(`Trying to recover from older version`); - if (process.env.NODE_ENV !== NODE_ENVIRONMENTS.DEVELOPMENT) { - const rootPath = path.join(appRootPath.path, '..'); - const oldVersionsDirs = (await fs.promises.readdir(rootPath, { withFileTypes: true })) - .filter((dirent) => dirent.isDirectory()) - .map((dirent) => dirent.name) - .filter((name) => semver.valid(name) && !appRootPath.path.includes(name)); - - if (oldVersionsDirs.length === 0) { - console.error( - `Failed to start OT-Node, no backup code available. Error message: ${e.message}`, - ); - process.exit(1); - } - - const oldVersion = oldVersionsDirs.sort(semver.compare).pop(); - const oldversionPath = path.join(rootPath, oldVersion); - execSync(`ln -sfn ${oldversionPath} ${rootPath}/current`); - await fs.promises.rm(appRootPath.path, { force: true, recursive: true }); - } + // console.error(`Trying to recover from older version`); + // if (process.env.NODE_ENV !== NODE_ENVIRONMENTS.DEVELOPMENT) { + // const rootPath = path.join(appRootPath.path, '..'); + // const oldVersionsDirs = (await fs.promises.readdir(rootPath, { withFileTypes: true })) + // .filter((dirent) => dirent.isDirectory()) + // .map((dirent) => dirent.name) + // .filter((name) => semver.valid(name) && !appRootPath.path.includes(name)); + // + // if (oldVersionsDirs.length === 0) { + // console.error( + // `Failed to start OT-Node, no backup code available. Error message: ${e.message}`, + // ); + // process.exit(1); + // } + // + // const oldVersion = oldVersionsDirs.sort(semver.compare).pop(); + // const oldversionPath = path.join(rootPath, oldVersion); + // execSync(`ln -sfn ${oldversionPath} ${rootPath}/current`); + // await fs.promises.rm(appRootPath.path, { force: true, recursive: true }); + // } process.exit(1); } })(); diff --git a/src/modules/repository/implementation/sequelize/repositories/service-agreement-repository.js b/src/modules/repository/implementation/sequelize/repositories/service-agreement-repository.js index d0cfc6e50d..deec201839 100644 --- a/src/modules/repository/implementation/sequelize/repositories/service-agreement-repository.js +++ b/src/modules/repository/implementation/sequelize/repositories/service-agreement-repository.js @@ -229,6 +229,7 @@ class ServiceAgreementRepository { tokenId: { [Sequelize.Op.gte]: fromTokenId }, }, limit: batchSize, + order: [['token_id', 'asc']], }); } }