diff --git a/config/config.json b/config/config.json index b516c06fb5..7d06fb7061 100644 --- a/config/config.json +++ b/config/config.json @@ -151,13 +151,13 @@ "telemetry": { "enabled": true, "implementation": { - "ot-telemetry": { + "quest-telemetry": { "enabled": true, - "package": "./telemetry/implementation/ot-telemetry.js", + "package": "./telemetry/implementation/quest-telemetry.js", "config": { - "sendTelemetryData": false, - "signalingServerUrl": "null" - } + "ip_endpoint": "http::addr=localhost:10000", + "sendTelemetryData": false + } } } } @@ -319,13 +319,13 @@ "telemetry": { "enabled": true, "implementation": { - "ot-telemetry": { + "quest-telemetry": { "enabled": true, - "package": "./telemetry/implementation/ot-telemetry.js", + "package": "./telemetry/implementation/quest-telemetry.js", "config": { - "sendTelemetryData": false, - "signalingServerUrl": "null" - } + "ip_endpoint": "http::addr=localhost:10000", + "sendTelemetryData": false + } } } } @@ -487,13 +487,13 @@ "telemetry": { "enabled": true, "implementation": { - "ot-telemetry": { + "quest-telemetry": { "enabled": true, - "package": "./telemetry/implementation/ot-telemetry.js", + "package": "./telemetry/implementation/quest-telemetry.js", "config": { - "sendTelemetryData": true, - "signalingServerUrl": "https://v8-testnet-signaling.origin-trail.network/signal" - } + "ip_endpoint": "http::addr=localhost:10000", + "sendTelemetryData": false + } } } } @@ -654,13 +654,13 @@ "telemetry": { "enabled": true, "implementation": { - "ot-telemetry": { + "quest-telemetry": { "enabled": true, - "package": "./telemetry/implementation/ot-telemetry.js", + "package": "./telemetry/implementation/quest-telemetry.js", "config": { - "sendTelemetryData": true, - "signalingServerUrl": "https://devnet-signaling.origin-trail.network/signal" - } + "ip_endpoint": "http::addr=localhost:10000", + "sendTelemetryData": false + } } } } @@ -844,13 +844,13 @@ "telemetry": { "enabled": true, "implementation": { - "ot-telemetry": { + "quest-telemetry": { "enabled": true, - "package": "./telemetry/implementation/ot-telemetry.js", + "package": "./telemetry/implementation/quest-telemetry.js", "config": { - "sendTelemetryData": true, - "signalingServerUrl": "https://mainnet-signaling.origin-trail.network/signal" - } + "ip_endpoint": "http::addr=localhost:10000", + "sendTelemetryData": false + } } } } diff --git a/ot-node.js b/ot-node.js index e454613d68..3be681c472 100644 --- a/ot-node.js +++ b/ot-node.js @@ -104,7 +104,6 @@ class OTNode { await this.initializeRouters(); await this.startNetworkModule(); - this.startTelemetryModule(); this.resumeCommandExecutor(); this.logger.info('Node is up and running!'); @@ -348,22 +347,6 @@ class OTNode { await networkModuleManager.start(); } - startTelemetryModule() { - const telemetryModuleManager = this.container.resolve('telemetryModuleManager'); - const repositoryModuleManager = this.container.resolve('repositoryModuleManager'); - telemetryModuleManager.listenOnEvents((eventData) => { - repositoryModuleManager.createEventRecord( - eventData.operationId, - eventData.blockchainId, - eventData.lastEvent, - eventData.timestamp, - eventData.value1, - eventData.value2, - eventData.value3, - ); - }); - } - async initializeShardingTableService() { try { const shardingTableService = this.container.resolve('shardingTableService'); diff --git a/package-lock.json b/package-lock.json index b0cba1c9e1..6464e11274 100644 --- a/package-lock.json +++ b/package-lock.json @@ -17,6 +17,7 @@ "@polkadot/keyring": "^10.1.7", "@polkadot/util": "^10.1.7", "@polkadot/util-crypto": "^10.1.7", + "@questdb/nodejs-client": "^3.0.0", "app-root-path": "^3.1.0", "assertion-tools": "^2.0.2", "async": "^3.2.4", @@ -4835,6 +4836,11 @@ "version": "1.1.0", "license": "BSD-3-Clause" }, + "node_modules/@questdb/nodejs-client": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/@questdb/nodejs-client/-/nodejs-client-3.0.0.tgz", + "integrity": "sha512-lBKd732rRpS/pqyWgCJFVXi9N1YoWDAUZzp6BngBVxH92As/NDXigZmCYKQVKpAEGYx14606CH5AoJ23qf3oqA==" + }, "node_modules/@rdfjs/types": { "version": "1.1.0", "license": "MIT", @@ -24807,6 +24813,11 @@ "@protobufjs/utf8": { "version": "1.1.0" }, + "@questdb/nodejs-client": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/@questdb/nodejs-client/-/nodejs-client-3.0.0.tgz", + "integrity": "sha512-lBKd732rRpS/pqyWgCJFVXi9N1YoWDAUZzp6BngBVxH92As/NDXigZmCYKQVKpAEGYx14606CH5AoJ23qf3oqA==" + }, "@rdfjs/types": { "version": "1.1.0", "requires": { diff --git a/package.json b/package.json index 53b0c31e44..bf483d956c 100644 --- a/package.json +++ b/package.json @@ -68,6 +68,7 @@ "@ethersproject/bytes": "^5.7.0", "@ethersproject/hash": "^5.7.0", "@ethersproject/wallet": "^5.7.0", + "@questdb/nodejs-client": "^3.0.0", "@polkadot/api": "^9.3.2", "@polkadot/keyring": "^10.1.7", "@polkadot/util": "^10.1.7", diff --git a/src/modules/repository/implementation/sequelize/repositories/event-repository.js b/src/modules/repository/implementation/sequelize/repositories/event-repository.js index 276f83ce52..8304bfc66d 100644 --- a/src/modules/repository/implementation/sequelize/repositories/event-repository.js +++ b/src/modules/repository/implementation/sequelize/repositories/event-repository.js @@ -11,18 +11,6 @@ class EventRepository { this.model = models.event; } - async createEventRecord(operationId, blockchainId, name, timestamp, value1, value2, value3) { - return this.model.create({ - operationId, - blockchainId, - name, - timestamp, - value1, - value2, - value3, - }); - } - async getUnpublishedEvents() { // events without COMPLETE/FAILED status which are older than 30min // are also considered finished diff --git a/src/modules/repository/repository-module-manager.js b/src/modules/repository/repository-module-manager.js index e6dba2989c..405a5bf02d 100644 --- a/src/modules/repository/repository-module-manager.js +++ b/src/modules/repository/repository-module-manager.js @@ -208,26 +208,6 @@ class RepositoryModuleManager extends BaseModuleManager { return this.getRepository('shard').cleanShardingTable(blockchainId); } - async createEventRecord( - operationId, - blockchainId, - name, - timestamp, - value1 = null, - value2 = null, - value3 = null, - ) { - return this.getRepository('event').createEventRecord( - operationId, - blockchainId, - name, - timestamp, - value1, - value2, - value3, - ); - } - async getUnpublishedEvents() { return this.getRepository('event').getUnpublishedEvents(); } diff --git a/src/modules/telemetry/implementation/ot-telemetry.js b/src/modules/telemetry/implementation/ot-telemetry.js deleted file mode 100644 index c4460c7572..0000000000 --- a/src/modules/telemetry/implementation/ot-telemetry.js +++ /dev/null @@ -1,29 +0,0 @@ -import axios from 'axios'; - -class OTTelemetry { - async initialize(config, logger) { - this.config = config; - this.logger = logger; - } - - listenOnEvents(eventEmitter, onEventReceived) { - return eventEmitter.on('operation_status_changed', onEventReceived); - } - - async sendTelemetryData(nodeData, events) { - const signalingMessage = { nodeData, events }; - const config = { - method: 'post', - url: this.config.signalingServerUrl, - headers: { - 'Content-Type': 'application/json', - }, - data: JSON.stringify(signalingMessage), - }; - const response = await axios(config); - const isSuccess = response.status === 200; - return isSuccess; - } -} - -export default OTTelemetry; diff --git a/src/modules/telemetry/implementation/quest-telemetry.js b/src/modules/telemetry/implementation/quest-telemetry.js new file mode 100644 index 0000000000..9c5fca362f --- /dev/null +++ b/src/modules/telemetry/implementation/quest-telemetry.js @@ -0,0 +1,45 @@ +import { Sender } from '@questdb/nodejs-client'; + +class QuestTelemetry { + async initialize(config, logger) { + this.config = config; + this.logger = logger; + this.sender = Sender.fromConfig(this.config.ip_endpoint); + } + + listenOnEvents(eventEmitter, onEventReceived) { + return eventEmitter.on('operation_status_changed', onEventReceived); + } + + async sendTelemetryData( + operationId, + timestamp, + blockchainId = '', + name = '', + value1 = null, + value2 = null, + value3 = null, + ) { + try { + const table = this.sender.table('event'); + + table.symbol('operationId', operationId || 'NULL'); + table.symbol('blockchainId', blockchainId || 'NULL'); + table.symbol('name', name || 'NULL'); + if (value1 !== null) table.symbol('value1', value1); + if (value2 !== null) table.symbol('value2', value2); + if (value3 !== null) table.symbol('value3', value3); + table.timestampColumn('timestamp', timestamp * 1000); + + await table.at(Date.now(), 'ms'); + await this.sender.flush(); + await this.sender.close(); + + this.logger.info('Event telemetry successfully sent to QuestDB'); + } catch (err) { + this.logger.error(`Error sending telemetry to QuestDB: ${err.message}`); + } + } +} + +export default QuestTelemetry; diff --git a/src/modules/telemetry/telemetry-module-manager.js b/src/modules/telemetry/telemetry-module-manager.js index f5ef04377e..19b8e1182f 100644 --- a/src/modules/telemetry/telemetry-module-manager.js +++ b/src/modules/telemetry/telemetry-module-manager.js @@ -10,6 +10,22 @@ class TelemetryModuleManager extends BaseModuleManager { return 'telemetry'; } + async initialize() { + await super.initialize(); + + this.listenOnEvents((eventData) => { + this.sendTelemetryData( + eventData.operationId, + eventData.timestamp, + eventData.blockchainId, + eventData.lastEvent, + eventData.value1, + eventData.value2, + eventData.value3, + ); + }); + } + listenOnEvents(onEventReceived) { if (this.config.modules.telemetry.enabled && this.initialized) { return this.getImplementation().module.listenOnEvents( @@ -19,9 +35,17 @@ class TelemetryModuleManager extends BaseModuleManager { } } - async sendTelemetryData(nodeData, events) { - if (this.initialized) { - return this.getImplementation().module.sendTelemetryData(nodeData, events); + async sendTelemetryData(operationId, timestamp, blockchainId, name, value1, value2, value3) { + if (this.getImplementation().config.sendTelemetryData && this.initialized) { + return this.getImplementation().module.sendTelemetryData( + operationId, + timestamp, + blockchainId, + name, + value1, + value2, + value3, + ); } } }