Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ble scan performance #32

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"description": "rfparty monitor daemon",
"main": "src/BLEMonitor.js",
"scripts": {
"build": "node ./party/build.js",
"build": "mkdir -p dataparty; node ./party/build.js",
"start": "node ./bin/rfparty-monitor",
"rfpartyd": "DEBUG=rfpartyd,rfparty.task.*,Tasker.Task NOBLE_MULTI_ROLE=1 NOBLE_REPORT_ALL_HCI_EVENTS=1 node ./src/rfpartyd.js",
"tail": "tail -f /data/rfparty/logs/active-log.txt"
Expand Down
131 changes: 107 additions & 24 deletions party/tasks/ble-monitor.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
const HCIBindings = require('@abandonware/noble/lib/hci-socket/bindings')
const Noble = require('@abandonware/noble/lib/noble')

const DeltaTime = require('../../src/utils/delta-time')

/*const noble = new Noble(new HCIBindings({
deviceId: 1,
userChannel: true,
Expand All @@ -19,6 +21,7 @@ const moment = require('moment')

const debug = require('debug')('rfparty.task.ble-monitor')

const PENDING_LIMIT = 1

class BleMonitorTask extends ITask {

Expand All @@ -32,8 +35,12 @@ class BleMonitorTask extends ITask {
this.scanning = false
this.resetTimer = null
this.scanTimer = null
this.scanIntervalMs = 30000
this.scanIntervalMs = 60000
this.observationIntervalMs = 60000
this.drainPendingIntervalMs = 800

this.drainingPending = false
this.pendingCount = 0
this.packetCount = 0
this.stationCount = 0
this.duplicateCount = 0
Expand Down Expand Up @@ -70,16 +77,9 @@ class BleMonitorTask extends ITask {
this.scanTimer = null
}

debug('state', noble.state)
debug('noble state', noble.state)

await this.handleScanTimer()

/*this.scanTimer = setInterval(async ()=>{

}, this.scanIntervalMs)*/

//await this.stopScan()
//await this.startScan()
}
catch(err){
debug('error starting', err)
Expand All @@ -89,25 +89,66 @@ class BleMonitorTask extends ITask {
}

handleScanTimer = async ()=>{
debug('PROCESSED ', this.packetCount, '✉️ ', this.stationCount, '📡 ', 'duplicateCount=',this.duplicateCount)
debug('PROCESSED ', this.packetCount, '✉️ ', this.stationCount, '📡 ', 'duplicateCount=',this.duplicateCount, ' pending', this.pendingCount, ' cached', Object.keys(this.advMap).length, (new Date()).toLocaleString())
debug('scan interval - state = ',noble.state)


if(noble.state == 'poweredOn' && this.scanning){
debug('scan interval - stopping scan')
debug('scan interval - stopping scan', (new Date()).toLocaleString())
await this.stopScan()
}

if(this.pendingCount > PENDING_LIMIT){
this.drainingPending = true

debug('draining pending')

this.scanTimer = setTimeout(this.handleScanTimer, this.drainPendingIntervalMs)
return
}
else{
this.drainingPending = false
}


if(noble.state != 'poweredOn'){
debug('skipping scan start, adapter not powered on')
this.scanTimer = setTimeout(this.handleScanTimer, this.scanIntervalMs)
return
}

debug('scan interval - starting scan')
debug('cleaning cache')

let now = moment()

for(let dev in this.advMap){
//debug('\t', dev)

for(let eir in this.advMap[dev]){


let heard = this.advMap[dev][eir]
let diff = now.diff( heard[3] )

//debug('\t\t', eir, ' diff', diff)

if(diff >= this.observationIntervalMs){
//debug('delete')
delete this.advMap[dev][eir]
}

}

let devNode = this.advMap[dev]

if(Object.keys(devNode).length < 1){
//debug('delete')
delete this.advMap[dev]
}
}

debug('scan interval - starting scan', (new Date()).toLocaleString())
await this.startScan()
debug('scan interval - scanning')
debug('scan interval - scanning', (new Date()).toLocaleString())


this.scanTimer = setTimeout(this.handleScanTimer, this.scanIntervalMs)
Expand Down Expand Up @@ -142,7 +183,7 @@ class BleMonitorTask extends ITask {
debug('reset cancelled')
}

this.advMap = {}
//this.advMap = {}

if(!this.scanning || noble.state != 'poweredOn'){ return }

Expand Down Expand Up @@ -269,9 +310,13 @@ class BleMonitorTask extends ITask {
handleDeviceDiscovery = async (device)=>{
//debug(`device discovered: ${device.address} ${device.addressType} ${device.rssi} ${device.mtu} ${Object.keys(device)} ${JSON.stringify(device.advertisement)}`)

this.pendingCount++

//! device.advertisement.eir is Buffer


let cacheCheckTime = new DeltaTime().start()

let eirString64 = device.advertisement.eir.toString('base64')


Expand All @@ -285,22 +330,25 @@ class BleMonitorTask extends ITask {
this.advMap[device.address] = {}
}

this.advMap[device.address][eirString64] = [now, device.rssi, device.rssi] //! timestamp, rssi_low, rssi_high
let delayMs = Math.round(Math.random() * (this.scanIntervalMs*0.3))

this.advMap[device.address][eirString64] = [now.add(delayMs, 'ms'), device.rssi, device.rssi, now] //! cache_timestamp, rssi_low, rssi_high, last_observation
}

if(heard){

// check if we heard within a scanInterval?

let diff = heard[0].diff( now )
if(diff < this.scanIntervalMs){
let diff = now.diff( heard[3] )
if(diff < this.observationIntervalMs){
// only store if rssi pushes power bounds up or down

if(device.rssi >= heard[1] && device.rssi <= heard[2]){
//if(device.rssi >= heard[1] && device.rssi <= heard[2]){
//debug('\t','skip - within', device.address, heard[1], heard[2])
this.duplicateCount++
this.pendingCount--
return
}
//}

this.advMap[device.address][eirString64] = [
now,
Expand All @@ -311,15 +359,21 @@ class BleMonitorTask extends ITask {
//debug('bounds push')

} else {
//debug('delete', device.address)
delete this.advMap[device.address]
createCacheEntry()
}

} else {
createCacheEntry()
}

cacheCheckTime.stop()

let factoryTime = new DeltaTime().start()

debug(`device discovered: ${device.address} ${device.addressType} ${device.rssi} ${device.connectable} ${device.scannable} ${device.state} ${device.mtu} ${eirString64}`)

//debug(`device discovered: ${device.address} ${device.addressType} ${device.rssi} ${device.connectable} ${device.scannable} ${device.state} ${device.mtu} ${eirString64}`)
//this.emit('address', device)


Expand All @@ -337,24 +391,53 @@ class BleMonitorTask extends ITask {
lastLocation = this.gpsdTask.lastLocation
}

debug(lastLocation)
//debug(lastLocation)

const BleAdv = this.context.party.factory.getFactory('ble_adv')
const BleStation = this.context.party.factory.getFactory('ble_station')

factoryTime.stop()

let deviceDocTime = new DeltaTime().start()
let deviceDoc = await BleAdv.indexBleDevice(this.context.party, dev, lastLocation)
deviceDocTime.stop()

//debug('deviceDoc')

let stationDocTime = new DeltaTime().start()
let station = await BleStation.indexBleStation(this.context.party, deviceDoc)
stationDocTime.stop()

//debug('stationDoc')

const latencyReport = {
cache: cacheCheckTime.deltaMs,
factory: factoryTime.deltaMs,
device: deviceDocTime.deltaMs,
station: stationDocTime.deltaMs,
cached: Object.keys(this.advMap).length
}

let isNew = false

if(station.data.timebounds.first == station.data.timebounds.last){
this.stationCount++

debug(station.data)
isNew = true

//debug(station.data)
//this.emit('station_count', this.stationCount)
}

this.pendingCount--
latencyReport.isNew = isNew
latencyReport.pending = this.pendingCount

//debug('latency - ', dev.id, JSON.stringify(latencyReport, null, 2))

this.packetCount++
//this.emit('packet_count', this.packetCount)
}
}

module.exports = BleMonitorTask
module.exports = BleMonitorTask
6 changes: 5 additions & 1 deletion scripts/start-snif.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ sudo mkdir -p /data/rfparty/wifi
sudo mkdir -p /data/rfparty/logs
sudo mkdir -p /data/rfparty/agps

rm ~/.rfparty-monitor/unix-socket

activeLog=/data/rfparty/logs/active-log.txt
previousLog=/data/rfparty/logs/previous-log.txt

Expand All @@ -32,7 +34,9 @@ echo "starting blemonitor"

sessionStamp=`date +"%Y%m%d-%H-%M-%S"`

npm start --prefix /usr/lib/rfparty-monitor/ &> /data/rfparty/logs/log.$sessionStamp.txt &
npm run rfpartyd --prefix /usr/lib/rfparty-monitor/ &> /data/rfparty/logs/log.$sessionStamp.txt &

#npm start --prefix /usr/lib/rfparty-monitor/ &> /data/rfparty/logs/log.$sessionStamp.txt &

sudo ln -s /data/rfparty/logs/log.$sessionStamp.txt /data/rfparty/logs/active-log.txt

Expand Down
27 changes: 9 additions & 18 deletions src/rfpartyd.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,11 @@ async function main(){
config: config,
qbOptions: {
debounce: false,
find_dedup: false,
find_dedup: true,
timeout: false
}
})



debug('partying')

//const service = new Dataparty.IService({}, RFPartyService)

const runner = new Dataparty.ServiceRunnerNode({
party, service,
useNative: true,
Expand All @@ -61,7 +55,7 @@ async function main(){
const webHost = new Dataparty.ServiceHost({
runner,
trust_proxy: false,
//listenUri: 'http://0.0.0.0:4000'
/*listenUri: 'http://0.0.0.0:4000'*/
})


Expand All @@ -76,6 +70,11 @@ async function main(){


await party.start()

debug('compacting')
await party.db.compactDatabase()
debug('compacted')

await runner.start()
await webHost.start()
await unixSocketHost.start()
Expand All @@ -94,17 +93,9 @@ async function main(){
}

process.on('exit', exitHandler)
process.on('SIGINT', exitHandler);
// catches "kill pid" (for example: nodemon restart)
//process.on('SIGUSR1', exitHandler);
//process.on('SIGUSR2', exitHandler);
process.on('SIGINT', exitHandler)

// catches uncaught exceptions
//process.on('uncaughtException', exitHandler);

console.log('started')

//process.exit()
console.log('partying')
}


Expand Down
21 changes: 21 additions & 0 deletions src/utils/delta-time.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
class DeltaTime {
constructor(){
this.startMs = null
this.endMs = null
this.deltaMs = null
}

start(){
this.startMs = (new Date).getTime()
return this
}

stop(){
this.endMs = (new Date).getTime()

this.deltaMs = this.endMs - this.startMs
}

}

module.exports = DeltaTime