diff --git a/config/config.json b/config/config.json index 444662006..d06397ee1 100644 --- a/config/config.json +++ b/config/config.json @@ -170,8 +170,12 @@ "config": { "blockchains": ["hardhat1:31337", "hardhat2:31337"], "rpcEndpoints": { - "hardhat1:31337:": ["http://localhost:8545"], + "hardhat1:31337": ["http://localhost:8545"], "hardhat2:31337": ["http://localhost:9545"] + }, + "hubContractAddress": { + "hardhat1:31337": "0x5FbDB2315678afecb367f032d93F642f64180aa3", + "hardhat2:31337": "0x5FbDB2315678afecb367f032d93F642f64180aa3" } } } @@ -354,8 +358,12 @@ "config": { "blockchains": ["hardhat1:31337", "hardhat2:31337"], "rpcEndpoints": { - "hardhat1:31337:": ["http://localhost:8545"], + "hardhat1:31337": ["http://localhost:8545"], "hardhat2:31337": ["http://localhost:9545"] + }, + "hubContractAddress": { + "hardhat1:31337": "0x5FbDB2315678afecb367f032d93F642f64180aa3", + "hardhat2:31337": "0x5FbDB2315678afecb367f032d93F642f64180aa3" } } } @@ -539,6 +547,9 @@ "blockchains": ["base:84532"], "rpcEndpoints": { "base:84532": ["https://sepolia.base.org"] + }, + "hubContractAddress": { + "base:84532": "0xCca0eA14540588A09c85cD6A6Fc53eA3A7010692" } } } @@ -721,6 +732,9 @@ "blockchains": ["base:84532"], "rpcEndpoints": { "base:84532": ["https://sepolia.base.org"] + }, + "hubContractAddress": { + "base:84532": "0xAB4A4794Fc1F415C24807B947280aCa8dC492238" } } } @@ -930,6 +944,9 @@ "https://astrosat.origintrail.network/", "https://astrosat-2.origintrail.network/" ] + }, + "hubContractAddress": { + "otp:2043": "0x5fA7916c48Fe6D5F1738d12Ad234b78c90B4cAdA" } } } diff --git a/installer/installer.sh b/installer/installer.sh index 77d6ab75d..3d0640db4 100644 --- a/installer/installer.sh +++ b/installer/installer.sh @@ -406,6 +406,37 @@ EOF # Configure Base-Sepolia configure_blockchain "base" $base_blockchain_id + # Function to configure blockchain events services + configure_blockchain_events_services() { + local blockchain=$1 + local blockchain_id=$2 + + print_color $CYAN "🔧 Configuring Blockchain Events Service for Base Sepolia (Testnet)..." + + read -p "$(print_color $YELLOW "Enter your RPC endpoint: ")" RPC_ENDPOINT + print_color $GREEN "✅ RPC endpoint: $RPC_ENDPOINT" + + local jq_filter=$(cat < $CONFIG_DIR/origintrail_noderc_tmp + mv $CONFIG_DIR/origintrail_noderc_tmp $CONFIG_DIR/.origintrail_noderc + chmod 600 $CONFIG_DIR/.origintrail_noderc + } + + # Configure blockchain events service for Base Sepolia + configure_blockchain_events_services "base" $base_blockchain_id + # Now execute npm install after configuring wallets print_color $CYAN "📦 Installing npm packages..." perform_step npm ci --omit=dev --ignore-scripts "Executing npm install" diff --git a/ot-node.js b/ot-node.js index 5da371b70..a1366d3b2 100644 --- a/ot-node.js +++ b/ot-node.js @@ -44,6 +44,7 @@ class OTNode { this.initializeEventEmitter(); await this.initializeModules(); + this.initializeBlockchainEventsService(); await this.initializeParanets(); await this.createProfiles(); @@ -51,8 +52,6 @@ class OTNode { await this.initializeCommandExecutor(); await this.initializeShardingTableService(); - this.initializeBlockchainEventsService(); - await this.initializeRouters(); await this.startNetworkModule(); await this.initializeBLSService(); diff --git a/src/commands/blockchain-event-listener/blockchain-event-listener-command.js b/src/commands/blockchain-event-listener/blockchain-event-listener-command.js index c6fc83971..7226ea544 100644 --- a/src/commands/blockchain-event-listener/blockchain-event-listener-command.js +++ b/src/commands/blockchain-event-listener/blockchain-event-listener-command.js @@ -1,44 +1,29 @@ -import { setTimeout } from 'timers/promises'; import Command from '../command.js'; import { + BLOCKCHAIN_EVENT_PRIORITIES, CONTENT_ASSET_HASH_FUNCTION_ID, CONTRACTS, - TRIPLE_STORE_REPOSITORIES, - NODE_ENVIRONMENTS, - PENDING_STORAGE_REPOSITORIES, - CONTRACT_EVENTS, - MAXIMUM_FETCH_EVENTS_FAILED_COUNT, - DELAY_BETWEEN_FAILED_FETCH_EVENTS_MILLIS, - CONTRACT_EVENT_TO_GROUP_MAPPING, - GROUPED_CONTRACT_EVENTS, - ZERO_BYTES32, + CONTRACTS_EVENTS, + CONTRACTS_EVENTS_LISTENED, + DEFAULT_BLOCKCHAIN_EVENT_PRIORITY, ERROR_TYPE, OPERATION_ID_STATUS, + SHARDING_TABLE_RELATED_EVENTS, } from '../../constants/constants.js'; -const fetchEventsFailedCount = {}; -const eventNames = Object.values(CONTRACT_EVENTS).flat(); - class BlockchainEventListenerCommand extends Command { constructor(ctx) { super(ctx); this.blockchainModuleManager = ctx.blockchainModuleManager; this.repositoryModuleManager = ctx.repositoryModuleManager; - this.tripleStoreService = ctx.tripleStoreService; - this.pendingStorageService = ctx.pendingStorageService; this.ualService = ctx.ualService; this.hashingService = ctx.hashingService; - this.serviceAgreementService = ctx.serviceAgreementService; this.shardingTableService = ctx.shardingTableService; - this.paranetService = ctx.paranetService; this.blockchainEventsService = ctx.blockchainEventsService; this.fileService = ctx.fileService; - this.dataService = ctx.dataService; this.operationIdService = ctx.operationIdService; this.commandExecutor = ctx.commandExecutor; - this.eventGroupsBuffer = {}; - this.errorType = ERROR_TYPE.BLOCKCHAIN_EVENT_LISTENER_ERROR; } @@ -47,24 +32,11 @@ class BlockchainEventListenerCommand extends Command { try { await this.fetchAndHandleBlockchainEvents(blockchainId); - fetchEventsFailedCount[blockchainId] = 0; } catch (e) { - fetchEventsFailedCount[blockchainId] += 1; - - if (fetchEventsFailedCount[blockchainId] >= MAXIMUM_FETCH_EVENTS_FAILED_COUNT) { - this.blockchainModuleManager.removeImplementation(blockchainId); - - const errorMessage = `Unable to fetch new events for blockchain: ${blockchainId}. Error message: ${e.message}`; - this.logger.error(`${errorMessage} blockchain implementation removed.`); - return Command.empty(); - } - this.logger.error( - `Failed to get and process blockchain events for blockchain: ${blockchainId}. Error: ${e}`, + `Failed to fetch and process blockchain events for blockchain: ${blockchainId}. Error: ${e}`, ); - await setTimeout(DELAY_BETWEEN_FAILED_FETCH_EVENTS_MILLIS); - // Try again after a delay return Command.repeat(); } @@ -72,77 +44,59 @@ class BlockchainEventListenerCommand extends Command { } async fetchAndHandleBlockchainEvents(blockchainId) { - const isDevEnvironment = [NODE_ENVIRONMENTS.DEVELOPMENT, NODE_ENVIRONMENTS.TEST].includes( - process.env.NODE_ENV, - ); + const currentBlock = (await this.blockchainEventsService.getBlock(blockchainId)).number; - const currentBlock = await this.blockchainModuleManager.getBlockNumber(blockchainId); + const contractEventsData = await Promise.all( + Object.values(CONTRACTS_EVENTS).map(({ contract, events }) => + this.getContractEvents(blockchainId, contract, currentBlock, events), + ), + ); - let contractsEventsConfig = [ - { contract: CONTRACTS.SHARDING_TABLE_CONTRACT, events: CONTRACT_EVENTS.SHARDING_TABLE }, - { contract: CONTRACTS.STAKING_CONTRACT, events: CONTRACT_EVENTS.STAKING }, - { contract: CONTRACTS.PROFILE_CONTRACT, events: CONTRACT_EVENTS.PROFILE }, - { - contract: CONTRACTS.COMMIT_MANAGER_V1_U1_CONTRACT, - events: CONTRACT_EVENTS.COMMIT_MANAGER_V1, - }, - { - contract: CONTRACTS.PARAMETERS_STORAGE_CONTRACT, - events: CONTRACT_EVENTS.PARAMETERS_STORAGE, - }, - { contract: CONTRACTS.LOG2PLDSF_CONTRACT, events: CONTRACT_EVENTS.LOG2PLDSF }, - { contract: CONTRACTS.LINEAR_SUM_CONTRACT, events: CONTRACT_EVENTS.LINEAR_SUM }, - ]; + if (contractEventsData.some(({ eventsMissed }) => eventsMissed)) { + await this.shardingTableService.pullBlockchainShardingTable(blockchainId, true); + this.filterShardingTableRelatedEvents(contractEventsData); + } - const contractLastCheckedBlock = {}; - if (isDevEnvironment) { - // handling sharding table node added events first for tests and local network setup - // because of race condition for node added and ask updated events - - const { - events: shardingTableEvents, - contractName, - lastCheckedBlock, - } = await this.getContractEvents( - blockchainId, - CONTRACTS.SHARDING_TABLE_CONTRACT, - currentBlock, - CONTRACT_EVENTS.SHARDING_TABLE, - ); - contractLastCheckedBlock[contractName] = lastCheckedBlock; - await this.handleBlockchainEvents( + const unprocessedEvents = + await this.repositoryModuleManager.getAllUnprocessedBlockchainEvents( + CONTRACTS_EVENTS_LISTENED, blockchainId, - shardingTableEvents, - contractLastCheckedBlock, ); - contractsEventsConfig = contractsEventsConfig.filter( - (item) => item.contract !== CONTRACTS.SHARDING_TABLE_CONTRACT, - ); - } else { - contractsEventsConfig.push({ - contract: CONTRACTS.HUB_CONTRACT, - events: CONTRACT_EVENTS.HUB, - }); + if (unprocessedEvents.length > 0) { + this.logger.trace(`Found ${unprocessedEvents.length} unprocessed blockchain events.`); } - const contractEventsData = await Promise.all( - contractsEventsConfig.map(({ contract, events }) => - this.getContractEvents(blockchainId, contract, currentBlock, events), - ), - ); + const contractLastCheckedBlock = {}; + const events = unprocessedEvents; - const contractEvents = []; - for (const { events, contractName, lastCheckedBlock } of contractEventsData) { - contractEvents.push(events); + for (const { + contractName, + lastCheckedBlock, + events: contractEvents, + } of contractEventsData) { + const prioritizedEvents = contractEvents.map((event) => ({ + ...event, + priority: + BLOCKCHAIN_EVENT_PRIORITIES[event.event] || DEFAULT_BLOCKCHAIN_EVENT_PRIORITY, + })); + + // Collect all prioritized events + events.push(...prioritizedEvents); + + // Update the last checked block for this contract contractLastCheckedBlock[contractName] = lastCheckedBlock; } - await this.handleBlockchainEvents( - blockchainId, - contractEvents.flat(), - contractLastCheckedBlock, - ); + if (events.length !== 0) { + this.logger.trace( + `Storing ${events.length} events for blockchain ${blockchainId} in the database.`, + ); + await this.repositoryModuleManager.insertBlockchainEvents(events); + await this.processEventsByPriority(events); + } + + await this.updateLastCheckedBlocks(blockchainId, contractLastCheckedBlock); } async getContractEvents(blockchain, contractName, currentBlock, eventsToFilter) { @@ -151,485 +105,238 @@ class BlockchainEventListenerCommand extends Command { contractName, ); - const contract = this.blockchainModuleManager.getContract(blockchain, contractName); - const result = await this.blockchainEventsService.getPastEvents( blockchain, contractName, - contract, eventsToFilter, lastCheckedBlockObject?.lastCheckedBlock ?? 0, currentBlock, ); - if (!result.eventsMissed) { - await this.shardingTableService.pullBlockchainShardingTable(blockchain, true); - } + return { ...result, contractName }; + } + + filterShardingTableRelatedEvents(contractEventsData) { + contractEventsData.forEach((data) => { + if (SHARDING_TABLE_RELATED_EVENTS[data.contractName]) { + // eslint-disable-next-line no-param-reassign + data.events = data.events.filter( + (event) => + !SHARDING_TABLE_RELATED_EVENTS[data.contractName].includes(event.event), + ); + } + }); + } + + async processEventsByPriority(events) { + const queues = {}; + const eventsByPriority = events.forEach((event) => { + if (!queues[event.priority]) { + queues[event.priority] = []; + } + queues[event.priority].push(event); + }); - const { events, lastCheckedBlock } = result; + // Process each priority level sequentially + const priorityLevels = Object.keys(eventsByPriority).sort((a, b) => a - b); + for (const priority of priorityLevels) { + const priorityLevelEvents = eventsByPriority[priority]; - return { events, contractName, lastCheckedBlock }; + // eslint-disable-next-line no-await-in-loop + await Promise.all(priorityLevelEvents.map((event) => this.processEvent(event))); + } } - async handleBlockchainEvents(blockchainId, events, contractLastCheckedBlock) { - const eventsForProcessing = events.filter((event) => eventNames.includes(event.event)); + async processEvent(event) { + const handlerFunctionName = `handle${event.event}Event`; - // Store new events in the DB - if (eventsForProcessing?.length) { - this.logger.trace( - `${eventsForProcessing.length} blockchain events caught on blockchain ${blockchainId}.`, + if (typeof this[handlerFunctionName] !== 'function') { + this.logger.warn(`No handler for event type: ${event.event}`); + return; + } + + this.logger.trace(`Processing event ${event.event} in block ${event.block}.`); + try { + await this[handlerFunctionName](event); + } catch (error) { + this.logger.error( + `Error processing event ${event.event} in block ${event.block}: ${error.message}`, ); - await this.repositoryModuleManager.insertBlockchainEvents(eventsForProcessing); } + } - // Update last checked block after inserting into db + async updateLastCheckedBlocks(blockchainId, contractLastCheckedBlock) { await Promise.all( Object.entries(contractLastCheckedBlock).map(([contractName, lastCheckedBlock]) => this.repositoryModuleManager.updateLastCheckedBlock( blockchainId, lastCheckedBlock, - Date.now(0), + Date.now(), contractName, ), ), ); - - // Get unprocessed events from the DB - const unprocessedEvents = - await this.repositoryModuleManager.getAllUnprocessedBlockchainEvents( - eventNames, - blockchainId, - ); - - if (unprocessedEvents?.length) { - this.logger.trace( - `Processing ${unprocessedEvents.length} blockchain events on blockchain ${blockchainId}.`, - ); - let batchedEvents = {}; - let currentBlockNumber = 0; - for (const event of unprocessedEvents) { - if (event.block !== currentBlockNumber) { - // eslint-disable-next-line no-await-in-loop - await this.handleBlockBatchedEvents(batchedEvents); - batchedEvents = {}; - currentBlockNumber = event.block; - } - - // Check if event should be grouped with other event - const eventsGroupName = CONTRACT_EVENT_TO_GROUP_MAPPING[event.event]; - if (eventsGroupName) { - // Get Events Group object containing predefined events and Grouping Key (Event Argument) - const eventsGroup = GROUPED_CONTRACT_EVENTS[eventsGroupName]; - // Get value of the Grouping Key from the Event - const groupingKeyValue = JSON.parse(event.data)[eventsGroup.groupingKey]; - - if (!this.eventGroupsBuffer[blockchainId][eventsGroupName]) { - this.eventGroupsBuffer[blockchainId][eventsGroupName] = {}; - } - - if (!this.eventGroupsBuffer[blockchainId][eventsGroupName][groupingKeyValue]) { - this.eventGroupsBuffer[blockchainId][eventsGroupName][groupingKeyValue] = - []; - } - - // Push event to the buffer until Events Group is not full - this.eventGroupsBuffer[blockchainId][eventsGroupName][groupingKeyValue].push( - event, - ); - - // Mark event as processed - // TODO: There should be a smarter way to do this, because it will cause troubles - // in case node goes offline while only catched some of the events from the group - // and not all of them. Buffer will be cleared and event is already marked as processed. - // eslint-disable-next-line no-await-in-loop - await this.repositoryModuleManager.markBlockchainEventsAsProcessed([event]); - - // When all expected Events from the Event Group are collected - if ( - this.eventGroupsBuffer[blockchainId][eventsGroupName][groupingKeyValue] - .length === eventsGroup.events.length - ) { - if (!batchedEvents[eventsGroupName]) { - batchedEvents[eventsGroupName] = []; - } - - // Add Events Group to the Processing Queue - batchedEvents[eventsGroupName].push( - this.eventGroupsBuffer[blockchainId][eventsGroupName][groupingKeyValue], - ); - - // Remove Events Group from the Buffer - delete this.eventGroupsBuffer[blockchainId][eventsGroupName][ - groupingKeyValue - ]; - } - } else if (batchedEvents[event.event]) { - batchedEvents[event.event].push(event); - } else { - batchedEvents[event.event] = [event]; - } - } - - await this.handleBlockBatchedEvents(batchedEvents); - } } - async handleBlockBatchedEvents(batchedEvents) { - const handleBlockEventsPromises = []; - for (const [eventName, blockEvents] of Object.entries(batchedEvents)) { - handleBlockEventsPromises.push(this.handleBlockEvents(eventName, blockEvents)); + async handleParameterChangedEvent(event) { + const { blockchainId, contract, data } = event; + const { parameterName, parameterValue } = JSON.parse(data); + switch (contract) { + case CONTRACTS.PARAMETERS_STORAGE: + this.blockchainModuleManager.setContractCallCache( + blockchainId, + CONTRACTS.PARAMETERS_STORAGE, + parameterName, + parameterValue, + ); + break; + default: + this.logger.warn( + `Unable to handle parameter changed event. Unknown contract name ${event.contract}`, + ); } - // eslint-disable-next-line no-await-in-loop - await Promise.all(handleBlockEventsPromises); } - async handleBlockEvents(eventName, blockEvents) { - const handlerFunctionName = `handle${eventName}Events`; - if (!this[handlerFunctionName]) return; - this.logger.trace(`${blockEvents.length} ${eventName} events caught.`); - try { - await this[handlerFunctionName](blockEvents); - await this.repositoryModuleManager.markBlockchainEventsAsProcessed(blockEvents); - } catch (error) { - this.logger.warn( - `Error while processing events: ${eventName}. Error: ${error.message}`, - ); - } + handleNewContractEvent(event) { + const { contractName, newContractAddress } = JSON.parse(event.data); + this.blockchainModuleManager.initializeContract( + event.blockchain, + contractName, + newContractAddress, + ); } - async handleParameterChangedEvents(blockEvents) { - for (const event of blockEvents) { - const { blockchainId, contract, data } = event; - const { parameterName, parameterValue } = JSON.parse(data); - switch (contract) { - case CONTRACTS.LOG2PLDSF_CONTRACT: - // This invalidates contracts parameter - // TODO: Create function for contract call cache invalidation - this.blockchainModuleManager.setContractCallCache( - blockchainId, - CONTRACTS.LOG2PLDSF_CONTRACT, - parameterName, - null, - ); - break; - case CONTRACTS.LINEAR_SUM_CONTRACT: - this.blockchainModuleManager.setContractCallCache( - blockchainId, - CONTRACTS.LINEAR_SUM_CONTRACT, - parameterName, - null, - ); - break; - case CONTRACTS.PARAMETERS_STORAGE_CONTRACT: - this.blockchainModuleManager.setContractCallCache( - blockchainId, - CONTRACTS.PARAMETERS_STORAGE_CONTRACT, - parameterName, - parameterValue, - ); - break; - default: - this.logger.warn( - `Unable to handle parameter changed event. Unknown contract name ${event.contract}`, - ); - } - } - } + async handleContractChangedEvent(event) { + const { contractName, newContractAddress } = JSON.parse(event.data); + this.blockchainModuleManager.initializeContract( + event.blockchain, + contractName, + newContractAddress, + ); - handleNewContractEvents(blockEvents) { - for (const event of blockEvents) { - const { contractName, newContractAddress } = JSON.parse(event.data); - this.blockchainModuleManager.initializeContract( - event.blockchainId, - contractName, - newContractAddress, - ); + if (contractName === CONTRACTS.SHARDING_TABLE) { + await this.shardingTableService.pullBlockchainShardingTable(event.blockchain, true); } } - async handleContractChangedEvents(blockEvents) { - await Promise.all( - blockEvents.map(async (event) => { - const { contractName, newContractAddress } = JSON.parse(event.data); - this.blockchainModuleManager.initializeContract( - event.blockchainId, - contractName, - newContractAddress, - ); - - if (contractName === CONTRACTS.SHARDING_TABLE_CONTRACT) { - await this.shardingTableService.pullBlockchainShardingTable( - event.blockchainId, - true, - ); - } - }), + handleNewAssetStorageEvent(event) { + const { newContractAddress } = JSON.parse(event.data); + this.blockchainModuleManager.initializeAssetStorageContract( + event.blockchain, + newContractAddress, ); } - handleNewAssetStorageEvents(blockEvents) { - for (const event of blockEvents) { - const { newContractAddress } = JSON.parse(event.data); - this.blockchainModuleManager.initializeAssetStorageContract( - event.blockchainId, - newContractAddress, - ); - } - } - - handleAssetStorageChangedEvents(blockEvents) { - for (const event of blockEvents) { - const { newContractAddress } = JSON.parse(event.data); - this.blockchainModuleManager.initializeAssetStorageContract( - event.blockchainId, - newContractAddress, - ); - } + handleAssetStorageChangedEvent(event) { + const { newContractAddress } = JSON.parse(event.data); + this.blockchainModuleManager.initializeAssetStorageContract( + event.blockchain, + newContractAddress, + ); } - async handleNodeAddedEvents(blockEvents) { - const peerRecords = await Promise.all( - blockEvents.map(async (event) => { - const eventData = JSON.parse(event.data); + async handleNodeAddedEvent(event) { + const eventData = JSON.parse(event.data); - const nodeId = this.blockchainModuleManager.convertHexToAscii( - event.blockchainId, - eventData.nodeId, - ); + const nodeId = this.blockchainModuleManager.convertHexToAscii( + event.blockchain, + eventData.nodeId, + ); - const sha256 = await this.hashingService.callHashFunction( - CONTENT_ASSET_HASH_FUNCTION_ID, - nodeId, - ); + const sha256 = await this.hashingService.callHashFunction( + CONTENT_ASSET_HASH_FUNCTION_ID, + nodeId, + ); - return { - peerId: nodeId, - blockchainId: event.blockchainId, - ask: this.blockchainModuleManager.convertFromWei( - event.blockchainId, - eventData.ask, - ), - stake: this.blockchainModuleManager.convertFromWei( - event.blockchainId, - eventData.stake, - ), - lastSeen: new Date(0), - sha256, - }; - }), + await this.repositoryModuleManager.createPeerRecord( + nodeId, + event.blockchain, + this.blockchainModuleManager.convertFromWei(event.blockchain, eventData.ask), + this.blockchainModuleManager.convertFromWei(event.blockchain, eventData.stake), + new Date(0), + sha256, ); - await this.repositoryModuleManager.createManyPeerRecords(peerRecords); } - async handleNodeRemovedEvents(blockEvents) { - await Promise.all( - blockEvents.map(async (event) => { - const eventData = JSON.parse(event.data); + async handleNodeRemovedEvent(event) { + const eventData = JSON.parse(event.data); - const nodeId = this.blockchainModuleManager.convertHexToAscii( - event.blockchainId, - eventData.nodeId, - ); + const nodeId = this.blockchainModuleManager.convertHexToAscii( + event.blockchain, + eventData.nodeId, + ); - this.logger.trace(`Removing peer id: ${nodeId} from sharding table.`); + this.logger.trace(`Removing peer id: ${nodeId} from sharding table.`); - await this.repositoryModuleManager.removePeerRecord(event.blockchainId, nodeId); - }), - ); + await this.repositoryModuleManager.removePeerRecord(event.blockchain, nodeId); } - async handleStakeIncreasedEvents(blockEvents) { - await Promise.all( - blockEvents.map(async (event) => { - const eventData = JSON.parse(event.data); + async handleStakeIncreasedEvent(event) { + const eventData = JSON.parse(event.data); - const nodeId = this.blockchainModuleManager.convertHexToAscii( - event.blockchainId, - eventData.nodeId, - ); + const nodeId = this.blockchainModuleManager.convertHexToAscii( + event.blockchain, + eventData.nodeId, + ); - await this.repositoryModuleManager.updatePeerStake( - nodeId, - event.blockchainId, - this.blockchainModuleManager.convertFromWei( - event.blockchainId, - eventData.newStake, - ), - ); - }), + await this.repositoryModuleManager.updatePeerStake( + nodeId, + event.blockchain, + this.blockchainModuleManager.convertFromWei(event.blockchain, eventData.newStake), ); } - async handleStakeWithdrawalStartedEvents(blockEvents) { - await this.handleStakeIncreasedEvents(blockEvents); + async handleStakeWithdrawalStartedEvent(event) { + await this.handleStakeIncreasedEvent(event); } - async handleAskUpdatedEvents(blockEvents) { - await Promise.all( - blockEvents.map(async (event) => { - const eventData = JSON.parse(event.data); - - const nodeId = this.blockchainModuleManager.convertHexToAscii( - event.blockchainId, - eventData.nodeId, - ); + async handleAskUpdatedEvent(event) { + const eventData = JSON.parse(event.data); - await this.repositoryModuleManager.updatePeerAsk( - nodeId, - event.blockchainId, - this.blockchainModuleManager.convertFromWei(event.blockchainId, eventData.ask), - ); - }), + const nodeId = this.blockchainModuleManager.convertHexToAscii( + event.blockchain, + eventData.nodeId, ); - } - - async handleAssetMintedEvents(blockEvents) { - for (const event of blockEvents) { - const eventData = JSON.parse(event.data); - - const { assetContract, tokenId, state, publishOperationId } = eventData; - const blockchain = event.blockchainId; - // eslint-disable-next-line no-await-in-loop - const operationId = await this.operationIdService.generateOperationId( - OPERATION_ID_STATUS.PUBLISH_FINALIZATION.PUBLISH_FINALIZATION_START, - ); - - const datasetPath = this.fileService.getPendingStorageDocumentPath(publishOperationId); - - // eslint-disable-next-line no-await-in-loop - const data = await this.fileService.readFile(datasetPath, true); - - const ual = this.ualService.deriveUAL(blockchain, assetContract, tokenId); - - // eslint-disable-next-line no-await-in-loop - await this.commandExecutor.add({ - name: 'validateAssertionMetadataCommand', - sequence: ['storeAssertionCommand'], - delay: 0, - data: { - operationId, - ual, - blockchain, - contract: assetContract, - tokenId, - merkleRoot: state, - assertion: data.assertion, - cachedMerkleRoot: data.merkleRoot, - }, - transactional: false, - }); - } + await this.repositoryModuleManager.updatePeerAsk( + nodeId, + event.blockchain, + this.blockchainModuleManager.convertFromWei(event.blockchain, eventData.ask), + ); } - async handleStateFinalizedEvents(blockEvents) { - // todo: find a way to safely parallelize this - for (const event of blockEvents) { - const eventData = JSON.parse(event.data); - - const { tokenId, keyword, hashFunctionId, state, stateIndex } = eventData; - const blockchain = event.blockchainId; - const contract = eventData.assetContract; - this.logger.trace( - `Handling event: ${event.event} for asset with ual: ${this.ualService.deriveUAL( - blockchain, - contract, - tokenId, - )} with keyword: ${keyword}, assertion id: ${state}.`, - ); + async handleAssetMintedEvent(event) { + const eventData = JSON.parse(event.data); - // eslint-disable-next-line no-await-in-loop - await Promise.all([ - this.pendingStorageService.moveAndDeletePendingState( - TRIPLE_STORE_REPOSITORIES.PUBLIC_CURRENT, - TRIPLE_STORE_REPOSITORIES.PUBLIC_HISTORY, - PENDING_STORAGE_REPOSITORIES.PUBLIC, - blockchain, - contract, - tokenId, - keyword, - hashFunctionId, - state, - stateIndex, - ), - this.pendingStorageService.moveAndDeletePendingState( - TRIPLE_STORE_REPOSITORIES.PRIVATE_CURRENT, - TRIPLE_STORE_REPOSITORIES.PRIVATE_HISTORY, - PENDING_STORAGE_REPOSITORIES.PRIVATE, - blockchain, - contract, - tokenId, - keyword, - hashFunctionId, - state, - stateIndex, - ), - ]); + const { assetContract, tokenId, state, publishOperationId } = eventData; + const { blockchain } = event; - // eslint-disable-next-line no-await-in-loop - const paranetsBlockchains = await this.repositoryModuleManager.getParanetsBlockchains(); - - if (paranetsBlockchains.includes(blockchain)) { - // eslint-disable-next-line no-await-in-loop - const knowledgeAssetId = await this.paranetService.constructKnowledgeAssetId( - blockchain, - contract, - tokenId, - ); + const operationId = await this.operationIdService.generateOperationId( + OPERATION_ID_STATUS.PUBLISH_FINALIZATION.PUBLISH_FINALIZATION_START, + ); - // eslint-disable-next-line no-await-in-loop - const paranetId = await this.blockchainModuleManager.getParanetId( - blockchain, - knowledgeAssetId, - ); - if (paranetId && paranetId !== ZERO_BYTES32) { - // eslint-disable-next-line no-await-in-loop - const paranetExists = await this.repositoryModuleManager.paranetExists( - paranetId, - blockchain, - ); - if (paranetExists) { - const { - paranetKAStorageContract: paranetKasContract, - tokenId: paranetTokenId, - } = - // eslint-disable-next-line no-await-in-loop - await this.blockchainModuleManager.getKnowledgeAssetLocatorFromParanetId( - blockchain, - paranetId, - ); - const paranetUAL = this.ualService.deriveUAL( - blockchain, - paranetKasContract, - paranetTokenId, - ); - - // eslint-disable-next-line no-await-in-loop - const paranetAssetExists = await this.tripleStoreService.paranetAssetExists( - blockchain, - contract, - tokenId, - paranetKasContract, - paranetTokenId, - ); - - if (paranetAssetExists) { - const kaUAL = this.ualService.deriveUAL(blockchain, contract, tokenId); - - // Create a record for missing Paranet KA - // Paranet sync command will get it from network - // eslint-disable-next-line no-await-in-loop - await this.repositoryModuleManager.createMissedParanetAssetRecord({ - blockchainId: blockchain, - ual: kaUAL, - paranetUal: paranetUAL, - knowledgeAssetId, - }); - } - } - } - } - } + const datasetPath = this.fileService.getPendingStorageDocumentPath(publishOperationId); + + const data = await this.fileService.readFile(datasetPath, true); + + const ual = this.ualService.deriveUAL(blockchain, assetContract, tokenId); + + await this.commandExecutor.add({ + name: 'validateAssertionMetadataCommand', + sequence: ['storeAssertionCommand'], + delay: 0, + data: { + operationId, + ual, + blockchain, + contract: assetContract, + tokenId, + merkleRoot: state, + assertion: data.assertion, + cachedMerkleRoot: data.merkleRoot, + }, + transactional: false, + }); } /** diff --git a/src/commands/blockchain-event-listener/event-listener-command.js b/src/commands/blockchain-event-listener/event-listener-command.js index dbc60aeda..11d9079a6 100644 --- a/src/commands/blockchain-event-listener/event-listener-command.js +++ b/src/commands/blockchain-event-listener/event-listener-command.js @@ -4,6 +4,7 @@ import { NODE_ENVIRONMENTS, ERROR_TYPE, COMMAND_PRIORITY, + MAXIMUM_FETCH_EVENTS_FAILED_COUNT, } from '../../constants/constants.js'; class EventListenerCommand extends Command { @@ -35,6 +36,7 @@ class EventListenerCommand extends Command { return this.commandExecutor.add({ name: 'blockchainEventListenerCommand', data: commandData, + retries: MAXIMUM_FETCH_EVENTS_FAILED_COUNT, transactional: false, priority: COMMAND_PRIORITY.HIGHEST, isBlocking: true, diff --git a/src/commands/command-executor.js b/src/commands/command-executor.js index 19baa61ab..4eadcdf59 100644 --- a/src/commands/command-executor.js +++ b/src/commands/command-executor.js @@ -74,6 +74,7 @@ class CommandExecutor { async _execute(executeCommand) { const command = executeCommand; const now = Date.now(); + await this._update(command, { startedAt: now, }); @@ -82,7 +83,7 @@ class CommandExecutor { commandId: command.id, commandName: command.name, }; - if (command.data?.operationId !== undefined) { + if (command.data?.operationId) { commandContext.operationId = command.data.operationId; } const loggerWithContext = this.logger.child(commandContext); @@ -107,7 +108,7 @@ class CommandExecutor { }); return; } - if (command.deadlineAt !== undefined && now > command.deadlineAt) { + if (command.deadlineAt && now > command.deadlineAt) { loggerWithContext.warn('Command is too late...'); await this._update(command, { status: COMMAND_STATUS.EXPIRED, @@ -283,10 +284,33 @@ class CommandExecutor { */ async add(addCommand, addDelay, insert = true) { let command = addCommand; + + if (command.isBlocking) { + // Check the db to see if there are unfinalized instances of the same command + const unfinalizedBlockingCommands = + await this.repositoryModuleManager.findUnfinalizedCommandsByName(command.name); + + for (const unfinalizedCommand of unfinalizedBlockingCommands) { + if (command.id && command.id === unfinalizedCommand.id) { + if (insert) { + this.logger.warn(`Inserting duplicate of command ${command.id}!`); + } + continue; + } + + if (JSON.stringify(unfinalizedCommand.data) === JSON.stringify(command.data)) { + this.logger.info( + `Skipping blocking command: ${command.name} because of unfinalized instance of this command with id: ${unfinalizedCommand.id}`, + ); + return; + } + } + } + let delay = addDelay ?? 0; if (delay > MAX_COMMAND_DELAY_IN_MILLS) { - if (command.readyAt === undefined) { + if (!command.readyAt) { command.readyAt = Date.now(); } command.readyAt += delay; @@ -320,7 +344,7 @@ class CommandExecutor { */ async _handleRetry(retryCommand, handler) { const command = retryCommand; - if (command.retries !== undefined && command.retries > 1) { + if (command.retries && command.retries > 1) { command.data = handler.pack(command.data); await this._update(command, { status: COMMAND_STATUS.PENDING, @@ -346,7 +370,7 @@ class CommandExecutor { * @private */ async _handleError(command, handler, error) { - if (command.retries !== undefined && command.retries > 0) { + if (command.retries && command.retries > 0) { await this._update(command, { retries: command.retries - 1, }); @@ -387,6 +411,7 @@ class CommandExecutor { command.delay = command.delay ?? 0; command.transactional = command.transactional ?? 0; command.priority = command.priority ?? DEFAULT_COMMAND_PRIORITY; + command.isBlocking = command.isBlocking ?? false; command.status = COMMAND_STATUS.PENDING; if (!command.data) { @@ -449,6 +474,11 @@ class CommandExecutor { const commands = []; for (const command of pendingCommands) { + if (command?.isBlocking) { + commands.push(command); + continue; + } + if (!command?.parentId) { continue; } @@ -470,6 +500,7 @@ class CommandExecutor { name: commandModel.name, data: commandModel.data, priority: commandModel.priority ?? DEFAULT_COMMAND_PRIORITY, + isBlocking: commandModel.isBlocking ?? false, readyAt: commandModel.readyAt, delay: commandModel.delay, startedAt: commandModel.startedAt, diff --git a/src/constants/constants.js b/src/constants/constants.js index 8ce688546..dc6b8dd18 100644 --- a/src/constants/constants.js +++ b/src/constants/constants.js @@ -1,4 +1,5 @@ import { BigNumber, ethers } from 'ethers'; +import { createRequire } from 'module'; export const WS_RPC_PROVIDER_PRIORITY = 2; @@ -229,6 +230,37 @@ export const TRANSACTION_PRIORITY = { LOWEST: 20, }; +const require = createRequire(import.meta.url); + +export const ABIs = { + ContentAsset: require('dkg-evm-module/abi/ContentAssetV2.json'), + ContentAssetStorage: require('dkg-evm-module/abi/ContentAssetStorageV2.json'), + AssertionStorage: require('dkg-evm-module/abi/AssertionStorage.json'), + Staking: require('dkg-evm-module/abi/Staking.json'), + StakingStorage: require('dkg-evm-module/abi/StakingStorage.json'), + Token: require('dkg-evm-module/abi/Token.json'), + HashingProxy: require('dkg-evm-module/abi/HashingProxy.json'), + Hub: require('dkg-evm-module/abi/Hub.json'), + IdentityStorage: require('dkg-evm-module/abi/IdentityStorage.json'), + Log2PLDSF: require('dkg-evm-module/abi/Log2PLDSF.json'), + ParametersStorage: require('dkg-evm-module/abi/ParametersStorage.json'), + Profile: require('dkg-evm-module/abi/Profile.json'), + ProfileStorage: require('dkg-evm-module/abi/ProfileStorage.json'), + ScoringProxy: require('dkg-evm-module/abi/ScoringProxy.json'), + ServiceAgreementV1: require('dkg-evm-module/abi/ServiceAgreementV1.json'), + CommitManagerV1: require('dkg-evm-module/abi/CommitManagerV2.json'), + CommitManagerV1U1: require('dkg-evm-module/abi/CommitManagerV2U1.json'), + ProofManagerV1: require('dkg-evm-module/abi/ProofManagerV1.json'), + ProofManagerV1U1: require('dkg-evm-module/abi/ProofManagerV1U1.json'), + ShardingTable: require('dkg-evm-module/abi/ShardingTableV2.json'), + ShardingTableStorage: require('dkg-evm-module/abi/ShardingTableStorageV2.json'), + ServiceAgreementStorageProxy: require('dkg-evm-module/abi/ServiceAgreementStorageProxy.json'), + UnfinalizedStateStorage: require('dkg-evm-module/abi/UnfinalizedStateStorage.json'), + LinearSum: require('dkg-evm-module/abi/LinearSum.json'), + ParanetsRegistry: require('dkg-evm-module/abi/ParanetsRegistry.json'), + ParanetKnowledgeAssetsRegistry: require('dkg-evm-module/abi/ParanetKnowledgeAssetsRegistry.json'), +}; + export const CONTRACT_FUNCTION_PRIORITY = { 'submitCommit((address,uint256,bytes,uint8,uint16,uint72,uint72,uint72))': TRANSACTION_PRIORITY.MEDIUM, @@ -722,46 +754,59 @@ export const LOCAL_STORE_TYPES = { /** * Contract names - * @type {{SHARDING_TABLE_CONTRACT: string}} + * @type {{SHARDING_TABLE: string}} */ export const CONTRACTS = { - CONTENT_ASSET_CONTRACT: 'ContentAssetContract', - SHARDING_TABLE_CONTRACT: 'ShardingTableContract', - STAKING_CONTRACT: 'StakingContract', - PROFILE_CONTRACT: 'ProfileContract', - HUB_CONTRACT: 'HubContract', - CONTENT_ASSET: 'ContentAssetContract', - COMMIT_MANAGER_V1_U1_CONTRACT: 'CommitManagerV1U1Contract', - PARAMETERS_STORAGE_CONTRACT: 'ParametersStorageContract', - IDENTITY_STORAGE_CONTRACT: 'IdentityStorageContract', - LOG2PLDSF_CONTRACT: 'Log2PLDSFContract', - LINEAR_SUM_CONTRACT: 'LinearSumContract', - PARANETS_REGISTRY_CONTRACT: 'ParanetsRegistry', -}; - -export const CONTRACT_EVENTS = { - HUB: ['NewContract', 'ContractChanged', 'NewAssetStorage', 'AssetStorageChanged'], - SHARDING_TABLE: ['NodeAdded', 'NodeRemoved'], - STAKING: ['StakeIncreased', 'StakeWithdrawalStarted'], - PROFILE: ['AskUpdated'], - CONTENT_ASSET: ['AssetMinted'], - COMMIT_MANAGER_V1: ['StateFinalized'], - PARAMETERS_STORAGE: ['ParameterChanged'], - LOG2PLDSF: ['ParameterChanged'], - LINEAR_SUM: ['ParameterChanged'], -}; - -export const GROUPED_CONTRACT_EVENTS = {}; - -export const CONTRACT_EVENT_TO_GROUP_MAPPING = (() => { - const mapping = {}; - Object.entries(GROUPED_CONTRACT_EVENTS).forEach(([groupName, { events }]) => { - events.forEach((eventName) => { - mapping[eventName] = groupName; - }); - }); - return mapping; -})(); + SHARDING_TABLE: 'ShardingTable', + STAKING: 'Staking', + PROFILE: 'Profile', + HUB: 'Hub', + CONTENT_ASSET: 'ContentAsset', + COMMIT_MANAGER_V1_U1: 'CommitManagerV1U1', + PARAMETERS_STORAGE: 'ParametersStorage', + IDENTITY_STORAGE: 'IdentityStorage', + LOG2PLDSF: 'Log2PLDSF', + LINEAR_SUM: 'LinearSum', + PARANETS_REGISTRY: 'ParanetsRegistry', +}; + +export const SHARDING_TABLE_RELATED_EVENTS = { + ShardingTable: ['NodeAdded', 'NodeRemoved'], + Staking: ['StakeIncreased', 'StakeWithdrawalStarted'], + Profile: ['AskUpdated'], +}; + +export const CONTRACTS_EVENTS = { + HUB: { + contract: 'Hub', + events: ['NewContract', 'ContractChanged', 'NewAssetStorage', 'AssetStorageChanged'], + }, + CONTENT_ASSET: { contract: 'ContentAsset', events: ['AssetMinted'] }, + PARAMETERS_STORAGE: { contract: 'ParametersStorage', events: ['ParameterChanged'] }, + SHARDING_TABLE: { contract: 'ShardingTable', events: ['NodeAdded', 'NodeRemoved'] }, + STAKING: { contract: 'Staking', events: ['StakeIncreased', 'StakeWithdrawalStarted'] }, + PROFILE: { contract: 'Profile', events: ['AskUpdated'] }, +}; + +export const CONTRACTS_EVENTS_LISTENED = Object.values(CONTRACTS_EVENTS).map( + (value) => value.contract, +); + +export const BLOCKCHAIN_EVENT_PRIORITIES = { + NewContract: 1, + ContractChanged: 1, + NewAssetStorage: 1, + AssetStorageChanged: 1, + NodeAdded: 2, + NodeRemoved: 2, + ParameterChanged: 2, + StakeIncreased: 3, + StakeWithdrawalStarted: 3, + AskUpdated: 3, + AssetMinted: 4, +}; + +export const DEFAULT_BLOCKCHAIN_EVENT_PRIORITY = Infinity; export const NODE_ENVIRONMENTS = { DEVELOPMENT: 'development', @@ -773,8 +818,6 @@ export const NODE_ENVIRONMENTS = { export const MAXIMUM_FETCH_EVENTS_FAILED_COUNT = 1000; -export const DELAY_BETWEEN_FAILED_FETCH_EVENTS_MILLIS = 10 * 1000; - export const CONTRACT_EVENT_FETCH_INTERVALS = { MAINNET: 10 * 1000, DEVELOPMENT: 4 * 1000, diff --git a/src/modules/blockchain-events/blockchain-events-module-manager.js b/src/modules/blockchain-events/blockchain-events-module-manager.js index 90b1e35e3..313f6ad4d 100644 --- a/src/modules/blockchain-events/blockchain-events-module-manager.js +++ b/src/modules/blockchain-events/blockchain-events-module-manager.js @@ -10,7 +10,6 @@ class BlockchainEventsModuleManager extends BaseModuleManager { async getPastEvents( implementationName, blockchain, - contract, contractName, eventsToFilter, lastCheckedBlock, @@ -20,7 +19,6 @@ class BlockchainEventsModuleManager extends BaseModuleManager { if (this.getImplementation(implementationName)) { return this.getImplementation(implementationName).module.getPastEvents( blockchain, - contract, contractName, eventsToFilter, lastCheckedBlock, diff --git a/src/modules/blockchain-events/implementation/ot-ethers/ot-ethers.js b/src/modules/blockchain-events/implementation/ot-ethers/ot-ethers.js index d1bf45afd..5901723d8 100644 --- a/src/modules/blockchain-events/implementation/ot-ethers/ot-ethers.js +++ b/src/modules/blockchain-events/implementation/ot-ethers/ot-ethers.js @@ -1,5 +1,5 @@ /* eslint-disable no-await-in-loop */ -import ethers from 'ethers'; +import { ethers } from 'ethers'; import BlockchainEventsService from '../blockchain-events-service.js'; import { @@ -11,16 +11,21 @@ import { MAX_BLOCKCHAIN_EVENT_SYNC_OF_HISTORICAL_BLOCKS_IN_MILLS, NODE_ENVIRONMENTS, BLOCK_TIME_MILLIS, + ABIs, + CONTRACTS_EVENTS_LISTENED, } from '../../../../constants/constants.js'; class OtEthers extends BlockchainEventsService { async initialize(config, logger) { await super.initialize(config, logger); - await this.initializeRpcProviders(); + this.contractCallCache = {}; + await this._initializeRpcProviders(); + await this._initializeContracts(); } - async initializeRpcProviders() { + async _initializeRpcProviders() { this.providers = {}; + for (const blockchain of this.config.blockchains) { const providers = []; for (const rpcEndpoint of this.config.rpcEndpoints[blockchain]) { @@ -61,11 +66,19 @@ class OtEthers extends BlockchainEventsService { } this.logger.debug( - `Connected to the blockchain RPC: ${this.maskRpcUrl(rpcEndpoint)}.`, + `Connected to the blockchain RPC: ${ + rpcEndpoint.includes('apiKey') + ? rpcEndpoint.split('apiKey')[0] + : rpcEndpoint + }.`, ); } catch (e) { this.logger.warn( - `Unable to connect to the blockchain RPC: ${this.maskRpcUrl(rpcEndpoint)}.`, + `Unable to connect to the blockchain RPC: ${ + rpcEndpoint.includes('apiKey') + ? rpcEndpoint.split('apiKey')[0] + : rpcEndpoint + }.`, ); } } @@ -86,25 +99,49 @@ class OtEthers extends BlockchainEventsService { } } - maskRpcUrl(url) { - if (url.includes('apiKey')) { - return url.split('apiKey')[0]; + async _initializeContracts() { + this.contracts = {}; + + for (const blockchain of this.config.blockchains) { + this.contracts[blockchain] = {}; + + this.logger.info( + `Initializing contracts with hub contract address: ${this.config.hubContractAddress[blockchain]}`, + ); + this.contracts[blockchain].Hub = new ethers.Contract( + this.config.hubContractAddress[blockchain], + ABIs.Hub, + this.providers[blockchain], + ); + + const contractsAray = await this.contracts[blockchain].Hub.getAllContracts(); + const assetStoragesArray = await this.contracts[blockchain].Hub.getAllAssetStorages(); + + const allContracts = [...contractsAray, ...assetStoragesArray]; + + for (const [contractName, contractAddress] of allContracts) { + if ( + CONTRACTS_EVENTS_LISTENED.includes(contractName) && + ABIs[contractName] != null + ) { + this.contracts[blockchain][contractName] = new ethers.Contract( + contractAddress, + ABIs[contractName], + this.providers[blockchain], + ); + } + } + + this.logger.info(`Contracts initialized`); } - return url; } - async getBlock(blockchain, tag = 'latest') { + async getBlock(blockchain, tag) { return this.providers[blockchain].getBlock(tag); } - async getPastEvents( - blockchain, - contractName, - contract, - eventsToFilter, - lastCheckedBlock, - currentBlock, - ) { + async getPastEvents(blockchain, contractName, eventsToFilter, lastCheckedBlock, currentBlock) { + const contract = this.contracts[blockchain][contractName]; if (!contract) { // this will happen when we have different set of contracts on different blockchains // eg LinearSum contract is available on gnosis but not on NeuroWeb, so the node should not fetch events @@ -116,17 +153,10 @@ class OtEthers extends BlockchainEventsService { }; } - let fromBlock; - let eventsMissed = false; - if ( - currentBlock - lastCheckedBlock > - (await this._getMaxNumberOfHistoricalBlocksForSync(blockchain)) - ) { - fromBlock = currentBlock; - eventsMissed = true; - } else { - fromBlock = lastCheckedBlock + 1; - } + const maxBlocksToSync = await this._getMaxNumberOfHistoricalBlocksForSync(blockchain); + let fromBlock = + currentBlock - lastCheckedBlock > maxBlocksToSync ? currentBlock : lastCheckedBlock + 1; + const eventsMissed = currentBlock - lastCheckedBlock > maxBlocksToSync; const topics = []; for (const filterName in contract.filters) { @@ -151,7 +181,7 @@ class OtEthers extends BlockchainEventsService { contract, topics, ); - newEvents.forEach((e) => events.push(...e)); + events.push(...newEvents.flat()); fromBlock = toBlock + 1; } } catch (error) { @@ -199,7 +229,7 @@ class OtEthers extends BlockchainEventsService { } const latestBlock = await this.getBlock(blockchain); - const olderBlock = await this.getBlock(latestBlock.number - blockRange); + const olderBlock = await this.getBlock(blockchain, latestBlock.number - blockRange); const timeDiffMillis = (latestBlock.timestamp - olderBlock.timestamp) * 1000; return timeDiffMillis / blockRange; diff --git a/src/modules/blockchain/blockchain-module-manager.js b/src/modules/blockchain/blockchain-module-manager.js index e3eedb79b..17622af65 100644 --- a/src/modules/blockchain/blockchain-module-manager.js +++ b/src/modules/blockchain/blockchain-module-manager.js @@ -510,10 +510,6 @@ class BlockchainModuleManager extends BaseModuleManager { knowledgeAssetId, ]); } - - getContract(blockchain, contractName) { - return this.callImplementationFunction(blockchain, 'getContract', [contractName]); - } } export default BlockchainModuleManager; diff --git a/src/modules/blockchain/implementation/web3-service.js b/src/modules/blockchain/implementation/web3-service.js index fb914771a..5bfcae35c 100644 --- a/src/modules/blockchain/implementation/web3-service.js +++ b/src/modules/blockchain/implementation/web3-service.js @@ -3,7 +3,6 @@ import { ethers, BigNumber } from 'ethers'; import axios from 'axios'; import async from 'async'; import { setTimeout as sleep } from 'timers/promises'; -import { createRequire } from 'module'; import { SOLIDITY_ERROR_STRING_PREFIX, @@ -23,40 +22,10 @@ import { CONTRACT_FUNCTION_PRIORITY, TRANSACTION_PRIORITY, CONTRACT_FUNCTION_GAS_LIMIT_INCREASE_FACTORS, + ABIs, } from '../../../constants/constants.js'; import Web3ServiceValidator from './web3-service-validator.js'; -const require = createRequire(import.meta.url); - -const ABIs = { - ContentAsset: require('dkg-evm-module/abi/ContentAssetV2.json'), - ContentAssetStorage: require('dkg-evm-module/abi/ContentAssetStorageV2.json'), - AssertionStorage: require('dkg-evm-module/abi/AssertionStorage.json'), - Staking: require('dkg-evm-module/abi/Staking.json'), - StakingStorage: require('dkg-evm-module/abi/StakingStorage.json'), - Token: require('dkg-evm-module/abi/Token.json'), - HashingProxy: require('dkg-evm-module/abi/HashingProxy.json'), - Hub: require('dkg-evm-module/abi/Hub.json'), - IdentityStorage: require('dkg-evm-module/abi/IdentityStorage.json'), - Log2PLDSF: require('dkg-evm-module/abi/Log2PLDSF.json'), - ParametersStorage: require('dkg-evm-module/abi/ParametersStorage.json'), - Profile: require('dkg-evm-module/abi/Profile.json'), - ProfileStorage: require('dkg-evm-module/abi/ProfileStorage.json'), - ScoringProxy: require('dkg-evm-module/abi/ScoringProxy.json'), - ServiceAgreementV1: require('dkg-evm-module/abi/ServiceAgreementV1.json'), - CommitManagerV1: require('dkg-evm-module/abi/CommitManagerV2.json'), - CommitManagerV1U1: require('dkg-evm-module/abi/CommitManagerV2U1.json'), - ProofManagerV1: require('dkg-evm-module/abi/ProofManagerV1.json'), - ProofManagerV1U1: require('dkg-evm-module/abi/ProofManagerV1U1.json'), - ShardingTable: require('dkg-evm-module/abi/ShardingTableV2.json'), - ShardingTableStorage: require('dkg-evm-module/abi/ShardingTableStorageV2.json'), - ServiceAgreementStorageProxy: require('dkg-evm-module/abi/ServiceAgreementStorageProxy.json'), - UnfinalizedStateStorage: require('dkg-evm-module/abi/UnfinalizedStateStorage.json'), - LinearSum: require('dkg-evm-module/abi/LinearSum.json'), - ParanetsRegistry: require('dkg-evm-module/abi/ParanetsRegistry.json'), - ParanetKnowledgeAssetsRegistry: require('dkg-evm-module/abi/ParanetKnowledgeAssetsRegistry.json'), -}; - const SCORING_FUNCTIONS = { 1: 'Log2PLDSF', 2: 'LinearSum', @@ -232,25 +201,22 @@ class Web3Service { } } - getABIs() { - return ABIs; - } - async initializeContracts() { + this.contracts = {}; this.contractAddresses = {}; this.logger.info( `Initializing contracts with hub contract address: ${this.config.hubContractAddress}`, ); - this.HubContract = new ethers.Contract( + this.contracts.Hub = new ethers.Contract( this.config.hubContractAddress, - this.getABIs().Hub, + ABIs.Hub, this.operationalWallets[0], ); - this.contractAddresses[this.config.hubContractAddress] = this.HubContract; + this.contractAddresses[this.config.hubContractAddress] = this.contracts.Hub; const contractsArray = await this.callContractFunction( - this.HubContract, + this.contracts.Hub, 'getAllContracts', [], ); @@ -261,7 +227,7 @@ class Web3Service { this.scoringFunctionsContracts = {}; const scoringFunctionsArray = await this.callContractFunction( - this.ScoringProxyContract, + this.contracts.ScoringProxy, 'getAllScoreFunctions', [], ); @@ -271,7 +237,7 @@ class Web3Service { this.assetStorageContracts = {}; const assetStoragesArray = await this.callContractFunction( - this.HubContract, + this.contracts.Hub, 'getAllAssetStorages', [], ); @@ -351,7 +317,7 @@ class Web3Service { initializeAssetStorageContract(assetStorageAddress) { this.assetStorageContracts[assetStorageAddress.toLowerCase()] = new ethers.Contract( assetStorageAddress, - this.getABIs().ContentAssetStorage, + ABIs.ContentAssetStorage, this.operationalWallets[0], ); this.contractAddresses[assetStorageAddress] = @@ -361,17 +327,13 @@ class Web3Service { initializeScoringContract(id, contractAddress) { const contractName = SCORING_FUNCTIONS[id]; - if (this.getABIs()[contractName] != null) { + if (ABIs[contractName] != null) { this.scoringFunctionsContracts[id] = new ethers.Contract( contractAddress, - this.getABIs()[contractName], + ABIs[contractName], this.operationalWallets[0], ); this.contractAddresses[contractAddress] = this.scoringFunctionsContracts[id]; - } else { - this.logger.trace( - `Skipping initialisation of contract with id: ${id}, address: ${contractAddress}`, - ); } } @@ -402,17 +364,13 @@ class Web3Service { } initializeContract(contractName, contractAddress) { - if (this.getABIs()[contractName] != null) { - this[`${contractName}Contract`] = new ethers.Contract( + if (ABIs[contractName] != null) { + this.contracts[contractName] = new ethers.Contract( contractAddress, - this.getABIs()[contractName], + ABIs[contractName], this.operationalWallets[0], ); - this.contractAddresses[contractAddress] = this[`${contractName}Contract`]; - } else { - this.logger.trace( - `Skipping initialisation of contract: ${contractName}, address: ${contractAddress}`, - ); + this.contractAddresses[contractAddress] = this.contracts[contractName]; } } @@ -446,7 +404,7 @@ class Web3Service { } async getTokenBalance(publicKey) { - const tokenBalance = await this.callContractFunction(this.TokenContract, 'balanceOf', [ + const tokenBalance = await this.callContractFunction(this.contracts.Token, 'balanceOf', [ publicKey, ]); return Number(ethers.utils.formatEther(tokenBalance)); @@ -464,10 +422,10 @@ class Web3Service { const promises = this.operationalWallets.map((wallet) => this.callContractFunction( - this.IdentityStorageContract, + this.contracts.IdentityStorage, 'getIdentityId', [wallet.address], - CONTRACTS.IDENTITY_STORAGE_CONTRACT, + CONTRACTS.IDENTITY_STORAGE, ).then((identityId) => [wallet.address, Number(identityId)]), ); const results = await Promise.all(promises); @@ -542,7 +500,7 @@ class Web3Service { try { // eslint-disable-next-line no-await-in-loop await this._executeContractFunction( - this.ProfileContract, + this.contracts.Profile, 'createProfile', [ this.getManagementKey(), @@ -909,26 +867,26 @@ class Web3Service { } async isAssetStorageContract(contractAddress) { - return this.callContractFunction(this.HubContract, 'isAssetStorage(address)', [ + return this.callContractFunction(this.contracts.Hub, 'isAssetStorage(address)', [ contractAddress, ]); } async getMinProofWindowOffsetPerc() { return this.callContractFunction( - this.ParametersStorageContract, + this.contracts.ParametersStorage, 'minProofWindowOffsetPerc', [], - CONTRACTS.PARAMETERS_STORAGE_CONTRACT, + CONTRACTS.PARAMETERS_STORAGE, ); } async getMaxProofWindowOffsetPerc() { return this.callContractFunction( - this.ParametersStorageContract, + this.contracts.ParametersStorage, 'maxProofWindowOffsetPerc', [], - CONTRACTS.PARAMETERS_STORAGE_CONTRACT, + CONTRACTS.PARAMETERS_STORAGE, ); } @@ -1007,7 +965,7 @@ class Web3Service { async getUnfinalizedState(tokenId) { return this.callContractFunction( - this.UnfinalizedStateStorageContract, + this.contracts.UnfinalizedStateStorage, 'getUnfinalizedState', [tokenId], ); @@ -1015,7 +973,7 @@ class Web3Service { async getAgreementData(agreementId) { const result = await this.callContractFunction( - this.ServiceAgreementStorageProxyContract, + this.contracts.ServiceAgreementStorageProxy, 'getAgreementData', [agreementId], ); @@ -1035,7 +993,7 @@ class Web3Service { async getAssertionSize(assertionId) { const assertionSize = await this.callContractFunction( - this.AssertionStorageContract, + this.contracts.AssertionStorage, 'getAssertionSize', [assertionId], ); @@ -1044,7 +1002,7 @@ class Web3Service { async getAssertionTriplesNumber(assertionId) { const assertionTriplesNumber = await this.callContractFunction( - this.AssertionStorageContract, + this.contracts.AssertionStorage, 'getAssertionTriplesNumber', [assertionId], ); @@ -1053,7 +1011,7 @@ class Web3Service { async getAssertionChunksNumber(assertionId) { const assertionChunksNumber = await this.callContractFunction( - this.AssertionStorageContract, + this.contracts.AssertionStorage, 'getAssertionChunksNumber', [assertionId], ); @@ -1062,7 +1020,7 @@ class Web3Service { async getAssertionData(assertionId) { const assertionData = await this.callContractFunction( - this.AssertionStorageContract, + this.contracts.AssertionStorage, 'getAssertion', [assertionId], ); @@ -1076,8 +1034,8 @@ class Web3Service { selectCommitManagerContract(latestStateIndex) { return latestStateIndex === 0 - ? this.CommitManagerV1Contract - : this.CommitManagerV1U1Contract; + ? this.contracts.CommitManagerV1 + : this.contracts.CommitManagerV1U1; } async isCommitWindowOpen(agreementId, epoch, latestStateIndex) { @@ -1090,7 +1048,7 @@ class Web3Service { async isUpdateCommitWindowOpen(agreementId, epoch, stateIndex) { return this.callContractFunction( - this.CommitManagerV1U1Contract, + this.contracts.CommitManagerV1U1, 'isUpdateCommitWindowOpen', [agreementId, epoch, stateIndex], ); @@ -1118,10 +1076,10 @@ class Web3Service { async getMinimumStake() { const minimumStake = await this.callContractFunction( - this.ParametersStorageContract, + this.contracts.ParametersStorage, 'minimumStake', [], - CONTRACTS.PARAMETERS_STORAGE_CONTRACT, + CONTRACTS.PARAMETERS_STORAGE, ); return Number(ethers.utils.formatEther(minimumStake)); @@ -1129,10 +1087,10 @@ class Web3Service { async getMaximumStake() { const maximumStake = await this.callContractFunction( - this.ParametersStorageContract, + this.contracts.ParametersStorage, 'maximumStake', [], - CONTRACTS.PARAMETERS_STORAGE_CONTRACT, + CONTRACTS.PARAMETERS_STORAGE, ); return Number(ethers.utils.formatEther(maximumStake)); @@ -1140,40 +1098,40 @@ class Web3Service { async getR2() { const r2 = await this.callContractFunction( - this.ParametersStorageContract, + this.contracts.ParametersStorage, 'r2', [], - CONTRACTS.PARAMETERS_STORAGE_CONTRACT, + CONTRACTS.PARAMETERS_STORAGE, ); return r2; } async getR1() { const r1 = await this.callContractFunction( - this.ParametersStorageContract, + this.contracts.ParametersStorage, 'r1', [], - CONTRACTS.PARAMETERS_STORAGE_CONTRACT, + CONTRACTS.PARAMETERS_STORAGE, ); return r1; } async getR0() { const r0 = await this.callContractFunction( - this.ParametersStorageContract, + this.contracts.ParametersStorage, 'r0', [], - CONTRACTS.PARAMETERS_STORAGE_CONTRACT, + CONTRACTS.PARAMETERS_STORAGE, ); return r0; } async getFinalizationCommitsNumber() { const finalizationCommitsNumber = await this.callContractFunction( - this.ParametersStorageContract, + this.contracts.ParametersStorage, 'finalizationCommitsNumber', [], - CONTRACTS.PARAMETERS_STORAGE_CONTRACT, + CONTRACTS.PARAMETERS_STORAGE, ); return finalizationCommitsNumber; } @@ -1235,7 +1193,7 @@ class Web3Service { 'submitUpdateCommit((address,uint256,bytes,uint8,uint16,uint72,uint72,uint72))'; } return this.queueTransaction( - this.CommitManagerV1U1Contract, + this.contracts.CommitManagerV1U1, functionName, [submitCommitArgs], callback, @@ -1244,7 +1202,9 @@ class Web3Service { } selectProofManagerContract(latestStateIndex) { - return latestStateIndex === 0 ? this.ProofManagerV1Contract : this.ProofManagerV1U1Contract; + return latestStateIndex === 0 + ? this.contracts.ProofManagerV1 + : this.contracts.ProofManagerV1U1; } async isProofWindowOpen(agreementId, epoch, latestStateIndex) { @@ -1292,12 +1252,12 @@ class Web3Service { } async getShardingTableHead() { - return this.callContractFunction(this.ShardingTableStorageContract, 'head', []); + return this.callContractFunction(this.contracts.ShardingTableStorage, 'head', []); } async getShardingTableLength() { const nodesCount = await this.callContractFunction( - this.ShardingTableStorageContract, + this.contracts.ShardingTableStorage, 'nodesCount', [], ); @@ -1306,7 +1266,7 @@ class Web3Service { async getShardingTablePage(startingIdentityId, nodesNum) { return this.callContractFunction( - this.ShardingTableContract, + this.contracts.ShardingTable, 'getShardingTable(uint72,uint72)', [startingIdentityId, nodesNum], ); @@ -1374,45 +1334,45 @@ class Web3Service { async getUpdateCommitWindowDuration() { const commitWindowDurationPerc = await this.callContractFunction( - this.ParametersStorageContract, + this.contracts.ParametersStorage, 'updateCommitWindowDuration', [], - CONTRACTS.PARAMETERS_STORAGE_CONTRACT, + CONTRACTS.PARAMETERS_STORAGE, ); return Number(commitWindowDurationPerc); } async getCommitWindowDurationPerc() { const commitWindowDurationPerc = await this.callContractFunction( - this.ParametersStorageContract, + this.contracts.ParametersStorage, 'commitWindowDurationPerc', [], - CONTRACTS.PARAMETERS_STORAGE_CONTRACT, + CONTRACTS.PARAMETERS_STORAGE, ); return Number(commitWindowDurationPerc); } async getProofWindowDurationPerc() { return this.callContractFunction( - this.ParametersStorageContract, + this.contracts.ParametersStorage, 'proofWindowDurationPerc', [], - CONTRACTS.PARAMETERS_STORAGE_CONTRACT, + CONTRACTS.PARAMETERS_STORAGE, ); } async getEpochLength() { const epochLength = await this.callContractFunction( - this.ParametersStorageContract, + this.contracts.ParametersStorage, 'epochLength', [], - CONTRACTS.PARAMETERS_STORAGE_CONTRACT, + CONTRACTS.PARAMETERS_STORAGE, ); return Number(epochLength); } async isHashFunction(hashFunctionId) { - return this.callContractFunction(this.HashingProxyContract, 'isHashFunction(uint8)', [ + return this.callContractFunction(this.contracts.HashingProxy, 'isHashFunction(uint8)', [ hashFunctionId, ]); } @@ -1426,7 +1386,7 @@ class Web3Service { this.scoringFunctionsContracts[1], 'getParameters', [], - CONTRACTS.LOG2PLDSF_CONTRACT, + CONTRACTS.LOG2PLDSF, ); const params = {}; @@ -1461,14 +1421,16 @@ class Web3Service { } async hasPendingUpdate(tokenId) { - return this.callContractFunction(this.UnfinalizedStateStorageContract, 'hasPendingUpdate', [ - tokenId, - ]); + return this.callContractFunction( + this.contracts.UnfinalizedStateStorage, + 'hasPendingUpdate', + [tokenId], + ); } async getAgreementScoreFunctionId(agreementId) { return this.callContractFunction( - this.ServiceAgreementStorageProxyContract, + this.contracts.ServiceAgreementStorageProxy, 'getAgreementScoreFunctionId', [agreementId], ); @@ -1479,7 +1441,7 @@ class Web3Service { this.scoringFunctionsContracts[2], 'getParameters', [], - CONTRACTS.LINEAR_SUM_CONTRACT, + CONTRACTS.LINEAR_SUM, ); return { distanceScaleFactor: BigNumber.from(linearSumParams[0]), @@ -1491,52 +1453,52 @@ class Web3Service { async getParanetKnowledgeAssetsCount(paranetId) { return this.callContractFunction( - this.ParanetsRegistryContract, + this.contracts.ParanetsRegistry, 'getKnowledgeAssetsCount', [paranetId], - CONTRACTS.PARANETS_REGISTRY_CONTRACT, + CONTRACTS.PARANETS_REGISTRY, ); } async getParanetKnowledgeAssetsWithPagination(paranetId, offset, limit) { return this.callContractFunction( - this.ParanetsRegistryContract, + this.contracts.ParanetsRegistry, 'getKnowledgeAssetsWithPagination', [paranetId, offset, limit], - CONTRACTS.PARANETS_REGISTRY_CONTRACT, + CONTRACTS.PARANETS_REGISTRY, ); } async getParanetMetadata(paranetId) { return this.callContractFunction( - this.ParanetsRegistryContract, + this.contracts.ParanetsRegistry, 'getParanetMetadata', [paranetId], - CONTRACTS.PARANETS_REGISTRY_CONTRACT, + CONTRACTS.PARANETS_REGISTRY, ); } async getName(paranetId) { return this.callContractFunction( - this.ParanetsRegistryContract, + this.contracts.ParanetsRegistry, 'getName', [paranetId], - CONTRACTS.PARANETS_REGISTRY_CONTRACT, + CONTRACTS.PARANETS_REGISTRY, ); } async getDescription(paranetId) { return this.callContractFunction( - this.ParanetsRegistryContract, + this.contracts.ParanetsRegistry, 'getDescription', [paranetId], - CONTRACTS.PARANETS_REGISTRY_CONTRACT, + CONTRACTS.PARANETS_REGISTRY, ); } async getParanetKnowledgeAssetLocator(knowledgeAssetId) { const [knowledgeAssetStorageContract, kaTokenId] = await this.callContractFunction( - this.ParanetKnowledgeAssetsRegistryContract, + this.contracts.ParanetKnowledgeAssetsRegistry, 'getKnowledgeAssetLocator', [knowledgeAssetId], ); @@ -1547,7 +1509,7 @@ class Web3Service { async getKnowledgeAssetLocatorFromParanetId(paranetId) { const [paranetKAStorageContract, paranetKATokenId] = await this.callContractFunction( - this.ParanetsRegistryContract, + this.contracts.ParanetsRegistry, 'getParanetKnowledgeAssetLocator', [paranetId], ); @@ -1558,16 +1520,16 @@ class Web3Service { async paranetExists(paranetId) { return this.callContractFunction( - this.ParanetsRegistryContract, + this.contracts.ParanetsRegistry, 'paranetExists', [paranetId], - CONTRACTS.PARANETS_REGISTRY_CONTRACT, + CONTRACTS.PARANETS_REGISTRY, ); } async getParanetId(knowledgeAssetId) { return this.callContractFunction( - this.ParanetKnowledgeAssetsRegistryContract, + this.contracts.ParanetKnowledgeAssetsRegistry, 'getParanetId', [knowledgeAssetId], ); @@ -1575,58 +1537,46 @@ class Web3Service { async isParanetKnowledgeAsset(knowledgeAssetId) { return this.callContractFunction( - this.ParanetKnowledgeAssetsRegistryContract, + this.contracts.ParanetKnowledgeAssetsRegistry, 'isParanetKnowledgeAsset', [knowledgeAssetId], ); } async isCuratedNode(paranetId, identityId) { - return this.callContractFunction(this.ParanetsRegistryContract, 'isCuratedNode', [ + return this.callContractFunction(this.contracts.ParanetsRegistry, 'isCuratedNode', [ paranetId, identityId, ]); } async getNodesAccessPolicy(paranetId) { - return this.callContractFunction(this.ParanetsRegistryContract, 'getNodesAccessPolicy', [ + return this.callContractFunction(this.contracts.ParanetsRegistry, 'getNodesAccessPolicy', [ paranetId, ]); } async getParanetCuratedNodes(paranetId) { return this.callContractFunction( - this.ParanetsRegistryContract, + this.contracts.ParanetsRegistry, 'getCuratedNodes', [paranetId], - CONTRACTS.PARANETS_REGISTRY_CONTRACT, + CONTRACTS.PARANETS_REGISTRY, ); } async getNodeId(identityId) { - return this.callContractFunction(this.ProfileStorageContract, 'getNodeId', [identityId]); + return this.callContractFunction(this.contracts.ProfileStorage, 'getNodeId', [identityId]); } async isKnowledgeAssetRegistered(paranetId, knowledgeAssetId) { return this.callContractFunction( - this.ParanetsRegistryContract, + this.contracts.ParanetsRegistry, 'isKnowledgeAssetRegistered', [paranetId, knowledgeAssetId], - CONTRACTS.PARANETS_REGISTRY_CONTRACT, + CONTRACTS.PARANETS_REGISTRY, ); } - - getContract(contractName) { - const finalContractName = contractName.endsWith('Contract') - ? contractName - : `${contractName}Contract`; - - const contract = this[finalContractName]; - if (contract === undefined) { - this.logger.error(`Could not fetch the contract ${finalContractName}.`); - } - return contract; - } } export default Web3Service; diff --git a/src/modules/module-config-validation.js b/src/modules/module-config-validation.js index f826314ea..ffd5a51ba 100644 --- a/src/modules/module-config-validation.js +++ b/src/modules/module-config-validation.js @@ -12,7 +12,7 @@ class ModuleConfigValidation { if (typeof this[`validate${capitalizedName}`] === 'function') { this[`validate${capitalizedName}`](config); } else { - throw Error(`Missing validation for ${capitalizedName}`); + throw new Error(`Missing validation for ${capitalizedName}`); } } @@ -36,8 +36,62 @@ class ModuleConfigValidation { return true; } - validateBlockchainEvents() { - return true; + validateBlockchainEvents(config) { + const occurences = {}; + for (const implementation of Object.values(config.implementation)) { + // eslint-disable-next-line no-continue + if (!implementation.enabled) { + continue; + } + + if (implementation.config.blockchains.length === 0) { + throw new Error( + 'Blockchains must be specified in the blockchain events service config.', + ); + } + + if ( + implementation.config.blockchains.length > + Object.keys(implementation.config.rpcEndpoints).length + ) { + throw new Error('Missing RPC edpoints in the blockchain events service config.'); + } + + if ( + implementation.config.blockchains.length > + Object.keys(implementation.config.hubContractAddress).length + ) { + throw new Error('Missing hub addresses in the blockchain events service config.'); + } + + for (const blockchain of implementation.config.blockchains) { + if (!occurences[blockchain]) { + occurences[blockchain] = 0; + } + occurences[blockchain] += 1; + + if (occurences[blockchain] > 1) { + throw new Error( + `Exactly one blockchain events service for blockchain ${blockchain} needs to be defined.`, + ); + } + + if ( + !implementation.config.rpcEndpoints[blockchain] || + implementation.config.rpcEndpoints[blockchain].length === 0 + ) { + throw new Error( + `RPC endpoint is not defined for blockchain: ${blockchain} in the blockchain events service config.`, + ); + } + + if (!implementation.config.hubContractAddress[blockchain]) { + throw new Error( + `Hub contract address is not defined for blockchain: ${blockchain} in the blockchain events service config.`, + ); + } + } + } } validateTripleStore(config) { @@ -57,7 +111,9 @@ class ModuleConfigValidation { } for (const repository of Object.values(TRIPLE_STORE_REPOSITORIES)) { if (occurences[repository] !== 1) { - throw Error(`Exactly one config for repository ${repository} needs to be defined.`); + throw new Error( + `Exactly one config for repository ${repository} needs to be defined.`, + ); } } } diff --git a/src/modules/repository/implementation/sequelize/migrations/20241129120000-add-commands-is_blocking.js b/src/modules/repository/implementation/sequelize/migrations/20241129120000-add-commands-is_blocking.js new file mode 100644 index 000000000..16c78763b --- /dev/null +++ b/src/modules/repository/implementation/sequelize/migrations/20241129120000-add-commands-is_blocking.js @@ -0,0 +1,9 @@ +export async function up({ context: { queryInterface, Sequelize } }) { + await queryInterface.addColumn('commands', 'is_blocking', { + type: Sequelize.BOOLEAN, + }); +} + +export async function down({ context: { queryInterface } }) { + await queryInterface.removeColumn('commands', 'is_blocking'); +} diff --git a/src/modules/repository/implementation/sequelize/migrations/20241201152000-update-blockchain-events.js b/src/modules/repository/implementation/sequelize/migrations/20241201152000-update-blockchain-events.js new file mode 100644 index 000000000..c57b2ac4f --- /dev/null +++ b/src/modules/repository/implementation/sequelize/migrations/20241201152000-update-blockchain-events.js @@ -0,0 +1,21 @@ +export async function up({ context: { queryInterface, Sequelize } }) { + await queryInterface.renameColumn('blockchain_event', 'blockchain_id', 'blockchain'); + + await queryInterface.changeColumn('blockchain_event', 'block', { + type: Sequelize.BIGINT, + }); + + await queryInterface.addColumn('blockchain_event', 'priority', { + type: Sequelize.BIGINT, + }); +} + +export async function down({ context: { queryInterface, Sequelize } }) { + await queryInterface.renameColumn('blockchain_event', 'blockchain', 'blockchain_id'); + + await queryInterface.changeColumn('blockchain_event', 'block', { + type: Sequelize.INTEGER, + }); + + await queryInterface.removeColumn('blockchain_event', 'priority'); +} diff --git a/src/modules/repository/implementation/sequelize/models/blockchain-event.js b/src/modules/repository/implementation/sequelize/models/blockchain-event.js index b15537849..446b0e012 100644 --- a/src/modules/repository/implementation/sequelize/models/blockchain-event.js +++ b/src/modules/repository/implementation/sequelize/models/blockchain-event.js @@ -8,10 +8,11 @@ export default (sequelize, DataTypes) => { autoIncrement: true, }, contract: DataTypes.STRING, - blockchainId: DataTypes.STRING, + blockchain: DataTypes.STRING, event: DataTypes.STRING, data: DataTypes.TEXT, - block: DataTypes.INTEGER, + block: DataTypes.BIGINT, + priority: DataTypes.BIGINT, processed: DataTypes.BOOLEAN, createdAt: DataTypes.DATE, updatedAt: DataTypes.DATE, diff --git a/src/modules/repository/implementation/sequelize/models/commands.js b/src/modules/repository/implementation/sequelize/models/commands.js index e8c48316b..a0396d6d6 100644 --- a/src/modules/repository/implementation/sequelize/models/commands.js +++ b/src/modules/repository/implementation/sequelize/models/commands.js @@ -18,6 +18,7 @@ export default (sequelize, DataTypes) => { name: DataTypes.STRING, data: DataTypes.JSON, priority: DataTypes.BIGINT, + isBlocking: DataTypes.BOOLEAN, sequence: DataTypes.JSON, readyAt: DataTypes.BIGINT, delay: DataTypes.BIGINT, diff --git a/src/modules/repository/implementation/sequelize/repositories/blockchain-event-repository.js b/src/modules/repository/implementation/sequelize/repositories/blockchain-event-repository.js index 92f4edf03..436be043b 100644 --- a/src/modules/repository/implementation/sequelize/repositories/blockchain-event-repository.js +++ b/src/modules/repository/implementation/sequelize/repositories/blockchain-event-repository.js @@ -19,7 +19,7 @@ class BlockchainEventRepository { event: event.event, data: event.data, block: event.block, - blockchainId: event.blockchainId, + blockchain: event.blockchain, processed: false, })), { @@ -33,10 +33,10 @@ class BlockchainEventRepository { return insertedEvents; } - async getAllUnprocessedBlockchainEvents(eventNames, blockchainId) { + async getAllUnprocessedBlockchainEvents(eventNames, blockchain) { return this.model.findAll({ where: { - blockchainId, + blockchain, processed: false, event: { [Sequelize.Op.in]: eventNames }, }, @@ -44,14 +44,14 @@ class BlockchainEventRepository { }); } - async blockchainEventExists(contract, event, data, block, blockchainId) { + async blockchainEventExists(contract, event, data, block, blockchain) { const dbEvent = await this.model.findOne({ where: { contract, event, data, block, - blockchainId, + blockchain, }, }); return !!dbEvent; diff --git a/src/modules/repository/implementation/sequelize/repositories/command-repository.js b/src/modules/repository/implementation/sequelize/repositories/command-repository.js index 44c08b6be..dd0117823 100644 --- a/src/modules/repository/implementation/sequelize/repositories/command-repository.js +++ b/src/modules/repository/implementation/sequelize/repositories/command-repository.js @@ -68,6 +68,23 @@ class CommandRepository { limit, }); } + + async findUnfinalizedCommandsByName(name) { + return this.model.findAll({ + where: { + name, + status: { + [Sequelize.Op.notIn]: [ + COMMAND_STATUS.COMPLETED, + COMMAND_STATUS.FAILED, + COMMAND_STATUS.EXPIRED, + COMMAND_STATUS.UNKNOWN, + ], + }, + }, + raw: true, + }); + } } export default CommandRepository; diff --git a/src/modules/repository/repository-module-manager.js b/src/modules/repository/repository-module-manager.js index 31685ffca..8e604839a 100644 --- a/src/modules/repository/repository-module-manager.js +++ b/src/modules/repository/repository-module-manager.js @@ -64,6 +64,10 @@ class RepositoryModuleManager extends BaseModuleManager { return this.getRepository('command').findFinalizedCommands(timestamp, limit); } + async findUnfinalizedCommandsByName(limit) { + return this.getRepository('command').findUnfinalizedCommandsByName(limit); + } + async createOperationIdRecord(handlerData) { return this.getRepository('operation_id').createOperationIdRecord(handlerData); } diff --git a/src/service/blockchain-events-service.js b/src/service/blockchain-events-service.js index 33a5cb8e6..42eb96aed 100644 --- a/src/service/blockchain-events-service.js +++ b/src/service/blockchain-events-service.js @@ -17,9 +17,16 @@ class BlockchainEventsService { } } + async getBlock(blockchain, tag = 'latest') { + return this.blockchainEventsModuleManager.getBlock( + this.blockchainEventsServicesImplementations[blockchain], + blockchain, + tag, + ); + } + async getPastEvents( blockchain, - contract, contractName, eventsToFilter, lastCheckedBlock, @@ -29,7 +36,6 @@ class BlockchainEventsService { return this.blockchainEventsModuleManager.getPastEvents( this.blockchainEventsServicesImplementations[blockchain], blockchain, - contract, contractName, eventsToFilter, lastCheckedBlock, diff --git a/src/service/sharding-table-service.js b/src/service/sharding-table-service.js index 3213ea77e..cd75b4fda 100644 --- a/src/service/sharding-table-service.js +++ b/src/service/sharding-table-service.js @@ -33,9 +33,6 @@ class ShardingTableService { await Promise.all(pullBlockchainShardingTables); await this.networkModuleManager.onPeerConnected((connection) => { - this.logger.trace( - `Node connected to ${connection.remotePeer.toB58String()}, updating sharding table last seen and last dialed.`, - ); this.updatePeerRecordLastSeenAndLastDialed(connection.remotePeer.toB58String()).catch( (error) => { this.logger.warn(`Unable to update connected peer, error: ${error.message}`); @@ -47,7 +44,7 @@ class ShardingTableService { async pullBlockchainShardingTable(blockchainId, force = false) { const lastCheckedBlock = await this.repositoryModuleManager.getLastCheckedBlock( blockchainId, - CONTRACTS.SHARDING_TABLE_CONTRACT, + CONTRACTS.SHARDING_TABLE, ); if ( diff --git a/tools/local-network-setup/.origintrail_noderc_template.json b/tools/local-network-setup/.origintrail_noderc_template.json index 5178fcf07..f8e09ba83 100644 --- a/tools/local-network-setup/.origintrail_noderc_template.json +++ b/tools/local-network-setup/.origintrail_noderc_template.json @@ -108,6 +108,26 @@ } } } + }, + "blockchainEvents": { + "enabled": true, + "implementation": { + "ot-ethers": { + "enabled": true, + "package": "./blockchain-events/implementation/ot-ethers/ot-ethers.js", + "config": { + "blockchains": ["hardhat1:31337", "hardhat2:31337"], + "rpcEndpoints": { + "hardhat1:31337": ["http://localhost:8545"], + "hardhat2:31337": ["http://localhost:9545"] + }, + "hubContractAddress": { + "hardhat1:31337": "0x5FbDB2315678afecb367f032d93F642f64180aa3", + "hardhat2:31337": "0x5FbDB2315678afecb367f032d93F642f64180aa3" + } + } + } + } } }, "auth": {