diff --git a/config/config.json b/config/config.json index 9906ca1803..7b490510c4 100644 --- a/config/config.json +++ b/config/config.json @@ -166,17 +166,25 @@ } } } + }, + "telemetry": { + "enabled": true, + "implementation": { + "ot-telemetry": { + "enabled": true, + "package": "./telemetry/implementation/ot-telemetry.js", + "config": { + "sendTelemetryData": false, + "signalingServerUrl": "null" + } + } + } } }, "maximumAssertionSizeInKb": 2500, "commandExecutorVerboseLoggingEnabled": false, "appDataPath": "data", "logLevel": "info", - "telemetry": { - "enabled": true, - "sendTelemetryData": false, - "signalingServerUrl": "null" - }, "auth": { "ipBasedAuthEnabled": true, "tokenBasedAuthEnabled": false, @@ -301,17 +309,25 @@ "config": {} } } + }, + "telemetry": { + "enabled": true, + "implementation": { + "ot-telemetry": { + "enabled": true, + "package": "./telemetry/implementation/ot-telemetry.js", + "config": { + "sendTelemetryData": false, + "signalingServerUrl": "null" + } + } + } } }, "maximumAssertionSizeInKb": 2500, "commandExecutorVerboseLoggingEnabled": false, "appDataPath": "data", "logLevel": "trace", - "telemetry": { - "enabled": true, - "sendTelemetryData": false, - "signalingServerUrl": "null" - }, "auth": { "ipBasedAuthEnabled": true, "tokenBasedAuthEnabled": false, @@ -448,17 +464,25 @@ "config": {} } } + }, + "telemetry": { + "enabled": true, + "implementation": { + "ot-telemetry": { + "enabled": true, + "package": "./telemetry/implementation/ot-telemetry.js", + "config": { + "sendTelemetryData": true, + "signalingServerUrl": "https://testnet-signaling.origin-trail.network/signal" + } + } + } } }, "maximumAssertionSizeInKb": 2500, "commandExecutorVerboseLoggingEnabled": false, "appDataPath": "data", "logLevel": "trace", - "telemetry": { - "enabled": true, - "sendTelemetryData": true, - "signalingServerUrl": "https://testnet-signaling.origin-trail.network/signal" - }, "auth": { "ipBasedAuthEnabled": true, "tokenBasedAuthEnabled": false, @@ -596,17 +620,25 @@ "config": {} } } + }, + "telemetry": { + "enabled": true, + "implementation": { + "ot-telemetry": { + "enabled": true, + "package": "./telemetry/implementation/ot-telemetry.js", + "config": { + "sendTelemetryData": true, + "signalingServerUrl": "https://mainnet-signaling.origin-trail.network/signal" + } + } + } } }, "maximumAssertionSizeInKb": 2500, "commandExecutorVerboseLoggingEnabled": false, "appDataPath": "data", "logLevel": "trace", - "telemetry": { - "enabled": true, - "sendTelemetryData": true, - "signalingServerUrl": "https://mainnet-signaling.origin-trail.network/signal" - }, "auth": { "ipBasedAuthEnabled": true, "tokenBasedAuthEnabled": false, diff --git a/ot-node.js b/ot-node.js index fa68cfc2af..716669d52d 100644 --- a/ot-node.js +++ b/ot-node.js @@ -11,6 +11,7 @@ import OtnodeUpdateCommand from './src/commands/common/otnode-update-command.js' import OtAutoUpdater from './src/modules/auto-updater/implementation/ot-auto-updater.js'; import PullBlockchainShardingTableMigration from './src/migration/pull-sharding-table-migration.js'; import TripleStoreUserConfigurationMigration from './src/migration/triple-store-user-configuration-migration.js'; +import TelemetryModuleUserConfigurationMigration from './src/migration/telemetry-module-user-configuration-migration.js'; import PrivateAssetsMetadataMigration from './src/migration/private-assets-metadata-migration.js'; import ServiceAgreementsMetadataMigration from './src/migration/service-agreements-metadata-migration.js'; import RemoveAgreementStartEndTimeMigration from './src/migration/remove-agreement-start-end-time-migration.js'; @@ -36,6 +37,7 @@ class OTNode { await this.checkForUpdate(); await this.removeUpdateFile(); await this.executeTripleStoreUserConfigurationMigration(); + await this.executeTelemetryModuleUserConfigurationMigration(); this.logger.info(' ██████╗ ████████╗███╗ ██╗ ██████╗ ██████╗ ███████╗'); this.logger.info('██╔═══██╗╚══██╔══╝████╗ ██║██╔═══██╗██╔══██╗██╔════╝'); this.logger.info('██║ ██║ ██║ ██╔██╗ ██║██║ ██║██║ ██║█████╗'); @@ -65,11 +67,11 @@ class OTNode { await this.initializeCommandExecutor(); await this.initializeShardingTableService(); - await this.initializeTelemetryInjectionService(); await this.initializeBlockchainEventListenerService(); await this.initializeRouters(); await this.startNetworkModule(); + this.startTelemetryModule(); this.resumeCommandExecutor(); this.logger.info('Node is up and running!'); } @@ -275,6 +277,21 @@ class OTNode { await networkModuleManager.start(); } + startTelemetryModule() { + const telemetryModuleManager = this.container.resolve('telemetryModuleManager'); + const repositoryModuleManager = this.container.resolve('repositoryModuleManager'); + telemetryModuleManager.listenOnEvents((eventData) => { + repositoryModuleManager.createEventRecord( + eventData.operationId, + eventData.lastEvent, + eventData.timestamp, + eventData.value1, + eventData.value2, + eventData.value3, + ); + }); + } + async executePrivateAssetsMetadataMigration() { if ( process.env.NODE_ENV === NODE_ENVIRONMENTS.DEVELOPMENT || @@ -305,6 +322,25 @@ class OTNode { } } + async executeTelemetryModuleUserConfigurationMigration() { + if ( + process.env.NODE_ENV === NODE_ENVIRONMENTS.DEVELOPMENT || + process.env.NODE_ENV === NODE_ENVIRONMENTS.TEST + ) + return; + + const migration = new TelemetryModuleUserConfigurationMigration( + 'telemetryModuleUserConfigurationMigration', + this.logger, + this.config, + ); + if (!(await migration.migrationAlreadyExecuted())) { + await migration.migrate(); + this.logger.info('Node will now restart!'); + this.stop(1); + } + } + async executeTripleStoreUserConfigurationMigration() { if ( process.env.NODE_ENV === NODE_ENVIRONMENTS.DEVELOPMENT || @@ -474,22 +510,6 @@ class OTNode { } } - async initializeTelemetryInjectionService() { - if (this.config.telemetry.enabled) { - try { - const telemetryHubModuleManager = this.container.resolve( - 'telemetryInjectionService', - ); - telemetryHubModuleManager.initialize(); - this.logger.info('Telemetry Injection Service initialized successfully'); - } catch (e) { - this.logger.error( - `Telemetry hub module initialization failed. Error message: ${e.message}`, - ); - } - } - } - async removeUpdateFile() { const updateFilePath = this.fileService.getUpdateFilePath(); await this.fileService.removeFile(updateFilePath).catch((error) => { diff --git a/package-lock.json b/package-lock.json index 4fdac68a57..b9088bc25c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "origintrail_node", - "version": "6.0.16", + "version": "6.0.17", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "origintrail_node", - "version": "6.0.16", + "version": "6.0.17", "license": "ISC", "dependencies": { "@comunica/query-sparql": "^2.4.3", diff --git a/package.json b/package.json index 307fadd425..a8218caca2 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "origintrail_node", - "version": "6.0.16", + "version": "6.0.17", "description": "OTNode V6", "main": "index.js", "type": "module", diff --git a/src/commands/common/send-telemetry-command.js b/src/commands/common/send-telemetry-command.js index 03fa2ff128..ada1aab635 100644 --- a/src/commands/common/send-telemetry-command.js +++ b/src/commands/common/send-telemetry-command.js @@ -1,4 +1,3 @@ -import axios from 'axios'; import { createRequire } from 'module'; import Command from '../command.js'; import { SEND_TELEMETRY_COMMAND_FREQUENCY_MINUTES } from '../../constants/constants.js'; @@ -11,9 +10,10 @@ class SendTelemetryCommand extends Command { super(ctx); this.logger = ctx.logger; this.config = ctx.config; - this.telemetryInjectionService = ctx.telemetryInjectionService; this.networkModuleManager = ctx.networkModuleManager; this.blockchainModuleManager = ctx.blockchainModuleManager; + this.repositoryModuleManager = ctx.repositoryModuleManager; + this.telemetryModuleManager = ctx.telemetryModuleManager; } /** @@ -21,35 +21,31 @@ class SendTelemetryCommand extends Command { * @param command */ async execute() { - if (!this.config.telemetry.enabled || !this.config.telemetry.sendTelemetryData) { + if ( + !this.config.modules.telemetry.enabled || + !this.telemetryModuleManager.getModuleConfiguration().sendTelemetryData + ) { return Command.empty(); } + try { - const events = await this.telemetryInjectionService.getUnpublishedEvents(); - const signalingMessage = { - nodeData: { - version: pjson.version, - identity: this.networkModuleManager.getPeerId().toB58String(), - hostname: this.config.hostname, - operational_wallet: this.blockchainModuleManager.getPublicKey(), - management_wallet: this.blockchainModuleManager.getManagementKey(), - triple_store: this.config.modules.tripleStore.defaultImplementation, - auto_update_enabled: this.config.modules.autoUpdater.enabled, - multiaddresses: this.networkModuleManager.getMultiaddrs(), - }, - events: events || [], - }; - const config = { - method: 'post', - url: this.config.telemetry.signalingServerUrl, - headers: { - 'Content-Type': 'application/json', - }, - data: JSON.stringify(signalingMessage), + const events = (await this.getUnpublishedEvents()) || []; + const nodeData = { + version: pjson.version, + identity: this.networkModuleManager.getPeerId().toB58String(), + hostname: this.config.hostname, + operational_wallet: this.blockchainModuleManager.getPublicKey(), + management_wallet: this.blockchainModuleManager.getManagementKey(), + triple_store: this.config.modules.tripleStore.defaultImplementation, + auto_update_enabled: this.config.modules.autoUpdater.enabled, + multiaddresses: this.networkModuleManager.getMultiaddrs(), }; - const response = await axios(config); - if (response.status === 200 && events?.length > 0) { - await this.telemetryInjectionService.removePublishedEvents(events); + const isDataSuccessfullySent = await this.telemetryModuleManager.sendTelemetryData( + nodeData, + events, + ); + if (isDataSuccessfullySent && events?.length > 0) { + await this.removePublishedEvents(events); } } catch (e) { await this.handleError(e); @@ -83,6 +79,16 @@ class SendTelemetryCommand extends Command { Object.assign(command, map); return command; } + + async getUnpublishedEvents() { + return this.repositoryModuleManager.getUnpublishedEvents(); + } + + async removePublishedEvents(events) { + const ids = events.map((event) => event.id); + + await this.repositoryModuleManager.destroyEvents(ids); + } } export default SendTelemetryCommand; diff --git a/src/commands/protocols/common/submit-commit-command.js b/src/commands/protocols/common/submit-commit-command.js index 7c9cfba13a..b2312b72ee 100644 --- a/src/commands/protocols/common/submit-commit-command.js +++ b/src/commands/protocols/common/submit-commit-command.js @@ -49,9 +49,10 @@ class SubmitCommitCommand extends Command { stateIndex, ); if (alreadySubmitted) { - this.logger.trace( - `Commit already submitted for blockchain: ${blockchain} agreement id: ${agreementId}, epoch: ${epoch}, state index: ${stateIndex}`, - ); + const errorMessage = `Commit already submitted for blockchain: ${blockchain} agreement id: ${agreementId}, epoch: ${epoch}, state index: ${stateIndex}`; + this.logger.trace(errorMessage); + + await this.handleError(operationId, errorMessage, this.errorType, true); return Command.empty(); } diff --git a/src/commands/protocols/common/submit-proofs-command.js b/src/commands/protocols/common/submit-proofs-command.js index 8348480f84..2e44f23ba2 100644 --- a/src/commands/protocols/common/submit-proofs-command.js +++ b/src/commands/protocols/common/submit-proofs-command.js @@ -66,7 +66,10 @@ class SubmitProofsCommand extends Command { ); if (!assertion.length) { - this.logger.trace(`Assertion with id: ${assertionId} not found in triple store.`); + const errorMessage = `Assertion with id: ${assertionId} not found in triple store.`; + this.logger.trace(errorMessage); + + await this.handleError(operationId, errorMessage, this.errorType, true); return Command.empty(); } @@ -97,9 +100,18 @@ class SubmitProofsCommand extends Command { stateIndex, ); if (alreadySubmitted) { - this.logger.trace( - `Proofs already submitted for blockchain: ${blockchain} agreement id: ${agreementId}, epoch: ${epoch}, state index: ${stateIndex}`, - ); + const errorMessage = `Proofs already submitted for blockchain: ${blockchain} agreement id: ${agreementId}, epoch: ${epoch}, state index: ${stateIndex}`; + this.logger.trace(errorMessage); + + await this.handleError(operationId, errorMessage, this.errorType, true); + return Command.empty(); + } + + if (proof.length === 0) { + const errorMessage = `Error during Merkle Proof calculation for blockchain: ${blockchain} agreement id: ${agreementId}, epoch: ${epoch}, state index: ${stateIndex}, proof cannot be empty`; + this.logger.trace(errorMessage); + + await this.handleError(operationId, errorMessage, this.errorType, true); return Command.empty(); } diff --git a/src/commands/protocols/publish/receiver/v1.0.0/v1-0-0-handle-store-request-command.js b/src/commands/protocols/publish/receiver/v1.0.0/v1-0-0-handle-store-request-command.js index 0caa031c63..5e82bd6178 100644 --- a/src/commands/protocols/publish/receiver/v1.0.0/v1-0-0-handle-store-request-command.js +++ b/src/commands/protocols/publish/receiver/v1.0.0/v1-0-0-handle-store-request-command.js @@ -83,22 +83,6 @@ class HandleStoreRequestCommand extends HandleProtocolMessageCommand { stateIndex, ); - await this.repositoryModuleManager.updateServiceAgreementRecord( - blockchain, - contract, - tokenId, - agreementId, - agreementData.startTime, - agreementData.epochsNumber, - agreementData.epochLength, - agreementData.scoreFunctionId, - agreementData.proofWindowOffsetPerc, - hashFunctionId, - keyword, - assertionId, - stateIndex, - ); - await this.operationIdService.updateOperationIdStatus( operationId, OPERATION_ID_STATUS.PUBLISH.PUBLISH_LOCAL_STORE_END, diff --git a/src/constants/constants.js b/src/constants/constants.js index 3fff5854e4..285441faee 100644 --- a/src/constants/constants.js +++ b/src/constants/constants.js @@ -21,7 +21,7 @@ export const COMMIT_BLOCK_DURATION_IN_BLOCKS = 5; export const COMMITS_DELAY_BETWEEN_NODES_IN_BLOCKS = 2; -export const TRANSACTION_POLLING_TIMEOUT_MILLIS = 50 * 1000; +export const TRANSACTION_POLLING_TIMEOUT_MILLIS = 120 * 1000; export const SOLIDITY_ERROR_STRING_PREFIX = '0x08c379a0'; @@ -167,9 +167,9 @@ export const DEFAULT_COMMAND_REPEAT_INTERVAL_IN_MILLS = 5000; // 5 seconds export const DEFAULT_COMMAND_DELAY_IN_MILLS = 60 * 1000; // 60 seconds export const COMMAND_RETRIES = { - SUBMIT_COMMIT: 3, - SUBMIT_UPDATE_COMMIT: 3, - SUBMIT_PROOFS: 3, + SUBMIT_COMMIT: 0, + SUBMIT_UPDATE_COMMIT: 0, + SUBMIT_PROOFS: 0, }; export const WEBSOCKET_PROVIDER_OPTIONS = { diff --git a/src/migration/telemetry-module-user-configuration-migration.js b/src/migration/telemetry-module-user-configuration-migration.js new file mode 100644 index 0000000000..a49cc393af --- /dev/null +++ b/src/migration/telemetry-module-user-configuration-migration.js @@ -0,0 +1,45 @@ +import appRootPath from 'app-root-path'; +import path from 'path'; +import BaseMigration from './base-migration.js'; + +class TelemetryModuleUserConfigurationMigration extends BaseMigration { + async executeMigration() { + const configurationFolderPath = path.join(appRootPath.path, '..'); + const configurationFilePath = path.join( + configurationFolderPath, + this.config.configFilename, + ); + + const userConfiguration = await this.fileService.readFile(configurationFilePath, true); + + let newTelemetryConfig; + + if ('telemetry' in userConfiguration) { + const oldConfigTelemetry = userConfiguration.telemetry; + newTelemetryConfig = { + enabled: oldConfigTelemetry.enabled, + implementation: { + 'ot-telemetry': { + enabled: oldConfigTelemetry.enabled, + package: './telemetry/implementation/ot-telemetry.js', + config: { + sendTelemetryData: oldConfigTelemetry.sendTelemetryData, + signalingServerUrl: oldConfigTelemetry.signalingServerUrl, + }, + }, + }, + }; + + delete userConfiguration.telemetry; + userConfiguration.modules.telemetry = newTelemetryConfig; + + await this.fileService.writeContentsToFile( + configurationFolderPath, + this.config.configFilename, + JSON.stringify(userConfiguration, null, 4), + ); + } + } +} + +export default TelemetryModuleUserConfigurationMigration; diff --git a/src/modules/base-module-manager.js b/src/modules/base-module-manager.js index e96161999c..2bdc901d6a 100644 --- a/src/modules/base-module-manager.js +++ b/src/modules/base-module-manager.js @@ -91,7 +91,7 @@ class BaseModuleManager { delete this.handlers[name]; } - getModuleConfiguration(name) { + getModuleConfiguration(name = null) { return this.getImplementation(name).config; } } diff --git a/src/modules/module-config-validation.js b/src/modules/module-config-validation.js index 4eaaf02b42..fc2cd00619 100644 --- a/src/modules/module-config-validation.js +++ b/src/modules/module-config-validation.js @@ -8,9 +8,12 @@ class ModuleConfigValidation { validateModule(name, config) { this.validateRequiredModule(name, config); - const capitalizedName = name.charAt(0).toUpperCase() + name.slice(1); - this[`validate${capitalizedName}`](config); + if (typeof this[`validate${capitalizedName}`] === 'function') { + this[`validate${capitalizedName}`](config); + } else { + throw Error(`Missing validation for ${capitalizedName}`); + } } validateAutoUpdater() { @@ -68,6 +71,10 @@ class ModuleConfigValidation { this.logger.warn(message); } } + + validateTelemetry() { + return true; + } } export default ModuleConfigValidation; diff --git a/src/modules/telemetry/implementation/ot-telemetry.js b/src/modules/telemetry/implementation/ot-telemetry.js new file mode 100644 index 0000000000..c4460c7572 --- /dev/null +++ b/src/modules/telemetry/implementation/ot-telemetry.js @@ -0,0 +1,29 @@ +import axios from 'axios'; + +class OTTelemetry { + async initialize(config, logger) { + this.config = config; + this.logger = logger; + } + + listenOnEvents(eventEmitter, onEventReceived) { + return eventEmitter.on('operation_status_changed', onEventReceived); + } + + async sendTelemetryData(nodeData, events) { + const signalingMessage = { nodeData, events }; + const config = { + method: 'post', + url: this.config.signalingServerUrl, + headers: { + 'Content-Type': 'application/json', + }, + data: JSON.stringify(signalingMessage), + }; + const response = await axios(config); + const isSuccess = response.status === 200; + return isSuccess; + } +} + +export default OTTelemetry; diff --git a/src/modules/telemetry/telemetry-module-manager.js b/src/modules/telemetry/telemetry-module-manager.js new file mode 100644 index 0000000000..f5ef04377e --- /dev/null +++ b/src/modules/telemetry/telemetry-module-manager.js @@ -0,0 +1,29 @@ +import BaseModuleManager from '../base-module-manager.js'; + +class TelemetryModuleManager extends BaseModuleManager { + constructor(ctx) { + super(ctx); + this.eventEmitter = ctx.eventEmitter; + } + + getName() { + return 'telemetry'; + } + + listenOnEvents(onEventReceived) { + if (this.config.modules.telemetry.enabled && this.initialized) { + return this.getImplementation().module.listenOnEvents( + this.eventEmitter, + onEventReceived, + ); + } + } + + async sendTelemetryData(nodeData, events) { + if (this.initialized) { + return this.getImplementation().module.sendTelemetryData(nodeData, events); + } + } +} + +export default TelemetryModuleManager; diff --git a/src/service/telemetry-injection-service.js b/src/service/telemetry-injection-service.js deleted file mode 100644 index be59e6ca4f..0000000000 --- a/src/service/telemetry-injection-service.js +++ /dev/null @@ -1,37 +0,0 @@ -class TelemetryInjectionService { - constructor(ctx) { - this.logger = ctx.logger; - this.config = ctx.config; - this.eventEmitter = ctx.eventEmitter; - this.repositoryModuleManager = ctx.repositoryModuleManager; - } - - initialize() { - this.listenOnEvents(); - } - - listenOnEvents() { - this.eventEmitter.on('operation_status_changed', (eventData) => { - this.repositoryModuleManager.createEventRecord( - eventData.operationId, - eventData.lastEvent, - eventData.timestamp, - eventData.value1, - eventData.value2, - eventData.value3, - ); - }); - } - - async getUnpublishedEvents() { - return this.repositoryModuleManager.getUnpublishedEvents(); - } - - async removePublishedEvents(events) { - const ids = events.map((event) => event.id); - - await this.repositoryModuleManager.destroyEvents(ids); - } -} - -export default TelemetryInjectionService; diff --git a/test/modules/telemetry/config.json b/test/modules/telemetry/config.json new file mode 100644 index 0000000000..66c5eddea5 --- /dev/null +++ b/test/modules/telemetry/config.json @@ -0,0 +1,17 @@ +{ + "modules": { + "telemetry": { + "enabled": true, + "implementation": { + "ot-telemetry": { + "enabled": true, + "package": "./telemetry/implementation/ot-telemetry.js", + "config": { + "sendTelemetryData": false, + "signalingServerUrl": "null" + } + } + } + } + } +} diff --git a/test/modules/telemetry/telemetry.js b/test/modules/telemetry/telemetry.js new file mode 100644 index 0000000000..6a7c037fa7 --- /dev/null +++ b/test/modules/telemetry/telemetry.js @@ -0,0 +1,51 @@ +import { readFile } from 'fs/promises'; +import { describe, it, before } from 'mocha'; +import { expect, assert } from 'chai'; +import Logger from '../../../src/logger/logger.js'; +import TelemetryModuleManager from '../../../src/modules/telemetry/telemetry-module-manager.js'; + +let logger; +let telemetryModuleManager; +const config = JSON.parse(await readFile('./test/modules/telemetry/config.json')); + +describe('Telemetry module', () => { + before('Initialize logger', () => { + logger = new Logger('trace'); + logger.info = () => {}; + }); + + describe('Handle received events', () => { + it('should call onEventReceived when event is emitted', async () => { + const eventEmitter = { + eventListeners: {}, + + on(eventName, callback) { + if (!this.eventListeners[eventName]) { + this.eventListeners[eventName] = []; + } + this.eventListeners[eventName].push(callback); + }, + + emit(eventName, ...args) { + if (this.eventListeners[eventName]) { + this.eventListeners[eventName].forEach((callback) => callback(...args)); + } + }, + }; + + let callbackCalled = false; + + function onEventReceived() { + callbackCalled = true; + } + + telemetryModuleManager = new TelemetryModuleManager({ config, logger, eventEmitter }); + await telemetryModuleManager.initialize(); + telemetryModuleManager.listenOnEvents(onEventReceived); + + eventEmitter.emit('operation_status_changed'); + + assert(expect(callbackCalled).to.be.true); + }); + }); +});