From 437a1cf6e3f13915b9a973474526791a89746627 Mon Sep 17 00:00:00 2001 From: Alan Meekins Date: Wed, 14 Feb 2024 23:32:31 -0800 Subject: [PATCH 1/4] close #23 --- party/tasks/ble-monitor.js | 18 ++++++++++-------- src/rfpartyd.js | 27 +++++++++------------------ 2 files changed, 19 insertions(+), 26 deletions(-) diff --git a/party/tasks/ble-monitor.js b/party/tasks/ble-monitor.js index aa01cd3..1ee5a80 100644 --- a/party/tasks/ble-monitor.js +++ b/party/tasks/ble-monitor.js @@ -32,7 +32,7 @@ class BleMonitorTask extends ITask { this.scanning = false this.resetTimer = null this.scanTimer = null - this.scanIntervalMs = 30000 + this.scanIntervalMs = 60000 this.packetCount = 0 this.stationCount = 0 @@ -292,15 +292,15 @@ class BleMonitorTask extends ITask { // check if we heard within a scanInterval? - let diff = heard[0].diff( now ) - if(diff < this.scanIntervalMs){ + //let diff = heard[0].diff( now ) + //if(diff < this.scanIntervalMs){ // only store if rssi pushes power bounds up or down - if(device.rssi >= heard[1] && device.rssi <= heard[2]){ + //if(device.rssi >= heard[1] && device.rssi <= heard[2]){ //debug('\t','skip - within', device.address, heard[1], heard[2]) this.duplicateCount++ return - } + //} this.advMap[device.address][eirString64] = [ now, @@ -310,9 +310,9 @@ class BleMonitorTask extends ITask { //debug('bounds push') - } else { + /*} else { createCacheEntry() - } + }*/ } else { createCacheEntry() @@ -343,7 +343,9 @@ class BleMonitorTask extends ITask { const BleStation = this.context.party.factory.getFactory('ble_station') let deviceDoc = await BleAdv.indexBleDevice(this.context.party, dev, lastLocation) + debug('deviceDoc') let station = await BleStation.indexBleStation(this.context.party, deviceDoc) + debug('stationDoc') if(station.data.timebounds.first == station.data.timebounds.last){ this.stationCount++ @@ -357,4 +359,4 @@ class BleMonitorTask extends ITask { } } -module.exports = BleMonitorTask \ No newline at end of file +module.exports = BleMonitorTask diff --git a/src/rfpartyd.js b/src/rfpartyd.js index fad2c45..8e53e99 100644 --- a/src/rfpartyd.js +++ b/src/rfpartyd.js @@ -41,17 +41,11 @@ async function main(){ config: config, qbOptions: { debounce: false, - find_dedup: false, + find_dedup: true, timeout: false } }) - - - debug('partying') - - //const service = new Dataparty.IService({}, RFPartyService) - const runner = new Dataparty.ServiceRunnerNode({ party, service, useNative: true, @@ -61,7 +55,7 @@ async function main(){ const webHost = new Dataparty.ServiceHost({ runner, trust_proxy: false, - //listenUri: 'http://0.0.0.0:4000' + /*listenUri: 'http://0.0.0.0:4000'*/ }) @@ -76,6 +70,11 @@ async function main(){ await party.start() + + debug('compacting') + await party.db.compactDatabase() + debug('compacted') + await runner.start() await webHost.start() await unixSocketHost.start() @@ -94,17 +93,9 @@ async function main(){ } process.on('exit', exitHandler) - process.on('SIGINT', exitHandler); - // catches "kill pid" (for example: nodemon restart) - //process.on('SIGUSR1', exitHandler); - //process.on('SIGUSR2', exitHandler); + process.on('SIGINT', exitHandler) - // catches uncaught exceptions - //process.on('uncaughtException', exitHandler); - - console.log('started') - - //process.exit() + console.log('partying') } From 4b3117a73440b4b500a2733036032f3ee0e17d54 Mon Sep 17 00:00:00 2001 From: Alan Meekins Date: Fri, 16 Feb 2024 14:50:55 -0800 Subject: [PATCH 2/4] partial #23 --- package.json | 2 +- party/tasks/ble-monitor.js | 113 ++++++++++++++++++++++++++++++++----- scripts/start-snif.sh | 6 +- src/utils/delta-time.js | 21 +++++++ 4 files changed, 127 insertions(+), 15 deletions(-) create mode 100644 src/utils/delta-time.js diff --git a/package.json b/package.json index b0fa4ee..0e191a5 100644 --- a/package.json +++ b/package.json @@ -4,7 +4,7 @@ "description": "rfparty monitor daemon", "main": "src/BLEMonitor.js", "scripts": { - "build": "node ./party/build.js", + "build": "mkdir -p dataparty; node ./party/build.js", "start": "node ./bin/rfparty-monitor", "rfpartyd": "DEBUG=rfpartyd,rfparty.task.*,Tasker.Task NOBLE_MULTI_ROLE=1 NOBLE_REPORT_ALL_HCI_EVENTS=1 node ./src/rfpartyd.js", "tail": "tail -f /data/rfparty/logs/active-log.txt" diff --git a/party/tasks/ble-monitor.js b/party/tasks/ble-monitor.js index 1ee5a80..433c45f 100644 --- a/party/tasks/ble-monitor.js +++ b/party/tasks/ble-monitor.js @@ -2,6 +2,8 @@ const HCIBindings = require('@abandonware/noble/lib/hci-socket/bindings') const Noble = require('@abandonware/noble/lib/noble') +const DeltaTime = require('../../src/utils/delta-time') + /*const noble = new Noble(new HCIBindings({ deviceId: 1, userChannel: true, @@ -19,6 +21,7 @@ const moment = require('moment') const debug = require('debug')('rfparty.task.ble-monitor') +const PENDING_LIMIT = 1 class BleMonitorTask extends ITask { @@ -33,7 +36,10 @@ class BleMonitorTask extends ITask { this.resetTimer = null this.scanTimer = null this.scanIntervalMs = 60000 + this.drainPendingIntervalMs = 800 + this.drainingPending = false + this.pendingCount = 0 this.packetCount = 0 this.stationCount = 0 this.duplicateCount = 0 @@ -89,15 +95,26 @@ class BleMonitorTask extends ITask { } handleScanTimer = async ()=>{ - debug('PROCESSED ', this.packetCount, '✉️ ', this.stationCount, '📡 ', 'duplicateCount=',this.duplicateCount) + debug('PROCESSED ', this.packetCount, '✉️ ', this.stationCount, '📡 ', 'duplicateCount=',this.duplicateCount, ' pending', this.pendingCount) debug('scan interval - state = ',noble.state) - if(noble.state == 'poweredOn' && this.scanning){ debug('scan interval - stopping scan') await this.stopScan() } + if(this.pendingCount > PENDING_LIMIT){ + this.drainingPending = true + + debug('draining pending') + + this.scanTimer = setTimeout(this.handleScanTimer, this.drainPendingIntervalMs) + return + } + else{ + this.drainingPending = false + } + if(noble.state != 'poweredOn'){ debug('skipping scan start, adapter not powered on') @@ -105,6 +122,36 @@ class BleMonitorTask extends ITask { return } + debug('cleaning cache') + + let now = moment() + + for(let dev in this.advMap){ + debug('\t', dev) + + for(let eir in this.advMap[dev]){ + + + let adv = this.advMap[dev][eir] + let diff = now.diff( adv[0] ) + + debug('\t\t', eir, ' diff', diff) + + if(diff >= this.scanIntervalMs){ + debug('delete') + delete this.advMap[dev][eir] + } + + } + + let devNode = this.advMap[dev] + + if(Object.keys(devNode).length < 1){ + debug('delete') + delete this.advMap[dev] + } + } + debug('scan interval - starting scan') await this.startScan() debug('scan interval - scanning') @@ -142,7 +189,7 @@ class BleMonitorTask extends ITask { debug('reset cancelled') } - this.advMap = {} + //this.advMap = {} if(!this.scanning || noble.state != 'poweredOn'){ return } @@ -269,9 +316,13 @@ class BleMonitorTask extends ITask { handleDeviceDiscovery = async (device)=>{ //debug(`device discovered: ${device.address} ${device.addressType} ${device.rssi} ${device.mtu} ${Object.keys(device)} ${JSON.stringify(device.advertisement)}`) + this.pendingCount++ //! device.advertisement.eir is Buffer + + let cacheCheckTime = new DeltaTime().start() + let eirString64 = device.advertisement.eir.toString('base64') @@ -285,20 +336,23 @@ class BleMonitorTask extends ITask { this.advMap[device.address] = {} } - this.advMap[device.address][eirString64] = [now, device.rssi, device.rssi] //! timestamp, rssi_low, rssi_high + let delayMs = Math.round(Math.random() * (this.scanIntervalMs*0.3)) + + this.advMap[device.address][eirString64] = [now.add(delayMs, 'ms'), device.rssi, device.rssi] //! timestamp, rssi_low, rssi_high } if(heard){ // check if we heard within a scanInterval? - //let diff = heard[0].diff( now ) - //if(diff < this.scanIntervalMs){ + let diff = now.diff( heard[0] ) + if(diff < this.scanIntervalMs){ // only store if rssi pushes power bounds up or down //if(device.rssi >= heard[1] && device.rssi <= heard[2]){ //debug('\t','skip - within', device.address, heard[1], heard[2]) this.duplicateCount++ + this.pendingCount-- return //} @@ -310,16 +364,22 @@ class BleMonitorTask extends ITask { //debug('bounds push') - /*} else { + } else { + debug('delete', device.address) + delete this.advMap[device.address] createCacheEntry() - }*/ + } } else { createCacheEntry() } + + cacheCheckTime.stop() + + let factoryTime = new DeltaTime().start() - debug(`device discovered: ${device.address} ${device.addressType} ${device.rssi} ${device.connectable} ${device.scannable} ${device.state} ${device.mtu} ${eirString64}`) + //debug(`device discovered: ${device.address} ${device.addressType} ${device.rssi} ${device.connectable} ${device.scannable} ${device.state} ${device.mtu} ${eirString64}`) //this.emit('address', device) @@ -337,23 +397,50 @@ class BleMonitorTask extends ITask { lastLocation = this.gpsdTask.lastLocation } - debug(lastLocation) + //debug(lastLocation) const BleAdv = this.context.party.factory.getFactory('ble_adv') const BleStation = this.context.party.factory.getFactory('ble_station') + factoryTime.stop() + + let deviceDocTime = new DeltaTime().start() let deviceDoc = await BleAdv.indexBleDevice(this.context.party, dev, lastLocation) - debug('deviceDoc') + deviceDocTime.stop() + + //debug('deviceDoc') + + let stationDocTime = new DeltaTime().start() let station = await BleStation.indexBleStation(this.context.party, deviceDoc) - debug('stationDoc') + stationDocTime.stop() + + //debug('stationDoc') + + const latencyReport = { + cache: cacheCheckTime.deltaMs, + factory: factoryTime.deltaMs, + device: deviceDocTime.deltaMs, + station: stationDocTime.deltaMs, + cached: Object.keys(this.advMap).length + } + + let isNew = false if(station.data.timebounds.first == station.data.timebounds.last){ this.stationCount++ - debug(station.data) + isNew = true + + //debug(station.data) //this.emit('station_count', this.stationCount) } + this.pendingCount-- + latencyReport.isNew = isNew + latencyReport.pending = this.pendingCount + + debug('latency - ', dev.id, JSON.stringify(latencyReport, null, 2)) + this.packetCount++ //this.emit('packet_count', this.packetCount) } diff --git a/scripts/start-snif.sh b/scripts/start-snif.sh index b356c85..676b000 100755 --- a/scripts/start-snif.sh +++ b/scripts/start-snif.sh @@ -6,6 +6,8 @@ sudo mkdir -p /data/rfparty/wifi sudo mkdir -p /data/rfparty/logs sudo mkdir -p /data/rfparty/agps +rm ~/.rfparty-monitor/unix-socket + activeLog=/data/rfparty/logs/active-log.txt previousLog=/data/rfparty/logs/previous-log.txt @@ -32,7 +34,9 @@ echo "starting blemonitor" sessionStamp=`date +"%Y%m%d-%H-%M-%S"` -npm start --prefix /usr/lib/rfparty-monitor/ &> /data/rfparty/logs/log.$sessionStamp.txt & +npm run rfpartyd --prefix /usr/lib/rfparty-monitor/ &> /data/rfparty/logs/log.$sessionStamp.txt & + +#npm start --prefix /usr/lib/rfparty-monitor/ &> /data/rfparty/logs/log.$sessionStamp.txt & sudo ln -s /data/rfparty/logs/log.$sessionStamp.txt /data/rfparty/logs/active-log.txt diff --git a/src/utils/delta-time.js b/src/utils/delta-time.js new file mode 100644 index 0000000..2da19d9 --- /dev/null +++ b/src/utils/delta-time.js @@ -0,0 +1,21 @@ +class DeltaTime { + constructor(){ + this.startMs = null + this.endMs = null + this.deltaMs = null + } + + start(){ + this.startMs = (new Date).getTime() + return this + } + + stop(){ + this.endMs = (new Date).getTime() + + this.deltaMs = this.endMs - this.startMs + } + + } + + module.exports = DeltaTime \ No newline at end of file From 7b80d17c03bf2773f61d5e6b3b561839c2deb73e Mon Sep 17 00:00:00 2001 From: Alan Meekins Date: Fri, 16 Feb 2024 15:00:18 -0800 Subject: [PATCH 3/4] less debug --- party/tasks/ble-monitor.js | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/party/tasks/ble-monitor.js b/party/tasks/ble-monitor.js index 433c45f..fc6fb3c 100644 --- a/party/tasks/ble-monitor.js +++ b/party/tasks/ble-monitor.js @@ -95,7 +95,7 @@ class BleMonitorTask extends ITask { } handleScanTimer = async ()=>{ - debug('PROCESSED ', this.packetCount, '✉️ ', this.stationCount, '📡 ', 'duplicateCount=',this.duplicateCount, ' pending', this.pendingCount) + debug('PROCESSED ', this.packetCount, '✉️ ', this.stationCount, '📡 ', 'duplicateCount=',this.duplicateCount, ' pending', this.pendingCount, ' cached', Object.keys(this.advMap).length) debug('scan interval - state = ',noble.state) if(noble.state == 'poweredOn' && this.scanning){ @@ -127,7 +127,7 @@ class BleMonitorTask extends ITask { let now = moment() for(let dev in this.advMap){ - debug('\t', dev) + //debug('\t', dev) for(let eir in this.advMap[dev]){ @@ -135,10 +135,10 @@ class BleMonitorTask extends ITask { let adv = this.advMap[dev][eir] let diff = now.diff( adv[0] ) - debug('\t\t', eir, ' diff', diff) + //debug('\t\t', eir, ' diff', diff) if(diff >= this.scanIntervalMs){ - debug('delete') + //debug('delete') delete this.advMap[dev][eir] } @@ -147,7 +147,7 @@ class BleMonitorTask extends ITask { let devNode = this.advMap[dev] if(Object.keys(devNode).length < 1){ - debug('delete') + //debug('delete') delete this.advMap[dev] } } @@ -365,7 +365,7 @@ class BleMonitorTask extends ITask { //debug('bounds push') } else { - debug('delete', device.address) + //debug('delete', device.address) delete this.advMap[device.address] createCacheEntry() } @@ -439,7 +439,7 @@ class BleMonitorTask extends ITask { latencyReport.isNew = isNew latencyReport.pending = this.pendingCount - debug('latency - ', dev.id, JSON.stringify(latencyReport, null, 2)) + //debug('latency - ', dev.id, JSON.stringify(latencyReport, null, 2)) this.packetCount++ //this.emit('packet_count', this.packetCount) From 7831453ef928974f745eec5272f200565c162198 Mon Sep 17 00:00:00 2001 From: Alan Meekins Date: Sat, 17 Feb 2024 18:01:30 -0800 Subject: [PATCH 4/4] seperate scan interval from observation window --- party/tasks/ble-monitor.js | 30 ++++++++++++------------------ 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/party/tasks/ble-monitor.js b/party/tasks/ble-monitor.js index fc6fb3c..c92022f 100644 --- a/party/tasks/ble-monitor.js +++ b/party/tasks/ble-monitor.js @@ -36,6 +36,7 @@ class BleMonitorTask extends ITask { this.resetTimer = null this.scanTimer = null this.scanIntervalMs = 60000 + this.observationIntervalMs = 60000 this.drainPendingIntervalMs = 800 this.drainingPending = false @@ -76,16 +77,9 @@ class BleMonitorTask extends ITask { this.scanTimer = null } - debug('state', noble.state) + debug('noble state', noble.state) await this.handleScanTimer() - - /*this.scanTimer = setInterval(async ()=>{ - - }, this.scanIntervalMs)*/ - - //await this.stopScan() - //await this.startScan() } catch(err){ debug('error starting', err) @@ -95,11 +89,11 @@ class BleMonitorTask extends ITask { } handleScanTimer = async ()=>{ - debug('PROCESSED ', this.packetCount, '✉️ ', this.stationCount, '📡 ', 'duplicateCount=',this.duplicateCount, ' pending', this.pendingCount, ' cached', Object.keys(this.advMap).length) + debug('PROCESSED ', this.packetCount, '✉️ ', this.stationCount, '📡 ', 'duplicateCount=',this.duplicateCount, ' pending', this.pendingCount, ' cached', Object.keys(this.advMap).length, (new Date()).toLocaleString()) debug('scan interval - state = ',noble.state) if(noble.state == 'poweredOn' && this.scanning){ - debug('scan interval - stopping scan') + debug('scan interval - stopping scan', (new Date()).toLocaleString()) await this.stopScan() } @@ -132,12 +126,12 @@ class BleMonitorTask extends ITask { for(let eir in this.advMap[dev]){ - let adv = this.advMap[dev][eir] - let diff = now.diff( adv[0] ) + let heard = this.advMap[dev][eir] + let diff = now.diff( heard[3] ) //debug('\t\t', eir, ' diff', diff) - if(diff >= this.scanIntervalMs){ + if(diff >= this.observationIntervalMs){ //debug('delete') delete this.advMap[dev][eir] } @@ -152,9 +146,9 @@ class BleMonitorTask extends ITask { } } - debug('scan interval - starting scan') + debug('scan interval - starting scan', (new Date()).toLocaleString()) await this.startScan() - debug('scan interval - scanning') + debug('scan interval - scanning', (new Date()).toLocaleString()) this.scanTimer = setTimeout(this.handleScanTimer, this.scanIntervalMs) @@ -338,15 +332,15 @@ class BleMonitorTask extends ITask { let delayMs = Math.round(Math.random() * (this.scanIntervalMs*0.3)) - this.advMap[device.address][eirString64] = [now.add(delayMs, 'ms'), device.rssi, device.rssi] //! timestamp, rssi_low, rssi_high + this.advMap[device.address][eirString64] = [now.add(delayMs, 'ms'), device.rssi, device.rssi, now] //! cache_timestamp, rssi_low, rssi_high, last_observation } if(heard){ // check if we heard within a scanInterval? - let diff = now.diff( heard[0] ) - if(diff < this.scanIntervalMs){ + let diff = now.diff( heard[3] ) + if(diff < this.observationIntervalMs){ // only store if rssi pushes power bounds up or down //if(device.rssi >= heard[1] && device.rssi <= heard[2]){