Skip to content

Commit

Permalink
[TASK]: Add database support of influxdb2 client
Browse files Browse the repository at this point in the history
  • Loading branch information
genofire committed Apr 14, 2022
1 parent aa9d94f commit 0dcd55a
Show file tree
Hide file tree
Showing 10 changed files with 642 additions and 3 deletions.
33 changes: 33 additions & 0 deletions config_example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,39 @@ password = ""
#system = "productive"
#site = "ffhb"

# Save collected data to InfluxDB2.
# There are the following measurments:
# node: store node specific data i.e. clients memory, airtime
# link: store link tq between two interfaces of two different nodes with i.e. nodeid, address, hostname
# global: store global data, i.e. count of clients and nodes
# firmware: store the count of nodes tagged with firmware
# model: store the count of nodes tagged with hardware model
# autoupdater: store the count of autoupdate branch
[[database.connection.influxdb2]]
enable = false
address = "http://localhost:8086"
token = ""
organization_id = ""
bucket_default = ""

[database.connection.influxdb2.buckets]
#link = "yanic-temp"
#node = "yanic-temp"
#dhcp = "yanic-temp"
global = "yanic"
#firmware = "yanic-temp"
#model = "yanic-temp"
#autoupdater = "yanic-temp"

# Tagging of the data (optional)
[database.connection.influxdb2.tags]
# Tags used by Yanic would override the tags from this config
# nodeid, hostname, owner, model, firmware_base, firmware_release,frequency11g and frequency11a are tags which are already used
#tagname1 = "tagvalue 1"
# some useful e.g.:
#system = "productive"
#site = "ffhb"

# Graphite settings
[[database.connection.graphite]]
enable = false
Expand Down
1 change: 1 addition & 0 deletions database/all/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package all
import (
_ "yanic/database/graphite"
_ "yanic/database/influxdb"
_ "yanic/database/influxdb2"
_ "yanic/database/logging"
_ "yanic/database/respondd"
)
126 changes: 126 additions & 0 deletions database/influxdb2/database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package influxdb

import (
"context"

influxdb "github.com/influxdata/influxdb-client-go/v2"
influxdbAPI "github.com/influxdata/influxdb-client-go/v2/api"

"github.com/bdlm/log"
"yanic/database"
)

const (
MeasurementLink = "link" // Measurement for per-link statistics
MeasurementNode = "node" // Measurement for per-node statistics
MeasurementDHCP = "dhcp" // Measurement for DHCP server statistics
MeasurementGlobal = "global" // Measurement for summarized global statistics
CounterMeasurementFirmware = "firmware" // Measurement for firmware statistics
CounterMeasurementModel = "model" // Measurement for model statistics
CounterMeasurementAutoupdater = "autoupdater" // Measurement for autoupdater
batchMaxSize = 1000
)

type Connection struct {
database.Connection
config Config
client influxdb.Client
writeAPI map[string]influxdbAPI.WriteAPI
}

type Config map[string]interface{}

func (c Config) Address() string {
return c["address"].(string)
}
func (c Config) Token() string {
if d, ok := c["token"]; ok {
return d.(string)
}
log.Panic("influxdb2 - no token given")
return ""
}
func (c Config) Organization() string {
if d, ok := c["organization_id"]; ok {
return d.(string)
}
return ""
}
func (c Config) Bucket(measurement string) string {
logger := log.WithFields(map[string]interface{}{
"organization_id": c.Organization(),
"address": c.Address(),
"measurement": measurement,
})
if d, ok := c["buckets"]; ok {
dMap := d.(map[string]interface{})
if d, ok := dMap[measurement]; ok {
bucket := d.(string)
logger.WithField("bucket", bucket).Info("get bucket for writeapi")
return bucket
}
if d, ok := c["bucket_default"]; ok {
bucket := d.(string)
logger.WithField("bucket", bucket).Info("get bucket for writeapi")
return bucket
}
}
if d, ok := c["bucket_default"]; ok {
bucket := d.(string)
logger.WithField("bucket", bucket).Info("get bucket for writeapi")
return bucket
}
logger.Panic("no bucket found for measurement")
return ""
}
func (c Config) Tags() map[string]string {
if c["tags"] != nil {
tags := make(map[string]string)
for k, v := range c["tags"].(map[string]interface{}) {
tags[k] = v.(string)
}
return tags
}
return nil
}

func init() {
database.RegisterAdapter("influxdb2", Connect)
}
func Connect(configuration map[string]interface{}) (database.Connection, error) {
config := Config(configuration)

// Make client
client := influxdb.NewClientWithOptions(config.Address(), config.Token(), influxdb.DefaultOptions().SetBatchSize(batchMaxSize))

ok, err := client.Ping(context.Background())
if !ok || err != nil {
return nil, err
}

writeAPI := map[string]influxdbAPI.WriteAPI{
MeasurementLink: client.WriteAPI(config.Organization(), config.Bucket(MeasurementLink)),
MeasurementNode: client.WriteAPI(config.Organization(), config.Bucket(MeasurementNode)),
MeasurementDHCP: client.WriteAPI(config.Organization(), config.Bucket(MeasurementDHCP)),
MeasurementGlobal: client.WriteAPI(config.Organization(), config.Bucket(MeasurementGlobal)),
CounterMeasurementFirmware: client.WriteAPI(config.Organization(), config.Bucket(CounterMeasurementFirmware)),
CounterMeasurementModel: client.WriteAPI(config.Organization(), config.Bucket(CounterMeasurementModel)),
CounterMeasurementAutoupdater: client.WriteAPI(config.Organization(), config.Bucket(CounterMeasurementAutoupdater)),
}

db := &Connection{
config: config,
client: client,
writeAPI: writeAPI,
}

return db, nil
}

// Close all connection and clean up
func (conn *Connection) Close() {
for _, api := range conn.writeAPI {
api.Flush()
}
conn.client.Close()
}
43 changes: 43 additions & 0 deletions database/influxdb2/database_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package influxdb

import (
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/assert"
)

func TestConnect(t *testing.T) {
assert := assert.New(t)

conn, err := Connect(map[string]interface{}{
"address": "",
"token": "",
"bucket_default": "all",
})
assert.NotNil(conn)
assert.Error(err)

conn, err = Connect(map[string]interface{}{
"address": "http://localhost",
"token": "",
"bucket_default": "all",
})
assert.NotNil(conn)
assert.Error(err)

srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}))
defer srv.Close()

conn, err = Connect(map[string]interface{}{
"address": srv.URL,
"token": "atoken",
"bucket_default": "all",
})

assert.NotNil(conn)
assert.NoError(err)
}
79 changes: 79 additions & 0 deletions database/influxdb2/global.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package influxdb

import (
"time"

"yanic/runtime"

"github.com/bdlm/log"
influxdb "github.com/influxdata/influxdb-client-go/v2"
)

// InsertGlobals implementation of database
func (conn *Connection) InsertGlobals(stats *runtime.GlobalStats, time time.Time, site string, domain string) {
measurementGlobal := MeasurementGlobal
counterMeasurementModel := CounterMeasurementModel
counterMeasurementFirmware := CounterMeasurementFirmware
counterMeasurementAutoupdater := CounterMeasurementAutoupdater

if site != runtime.GLOBAL_SITE {
measurementGlobal += "_site"
counterMeasurementModel += "_site"
counterMeasurementFirmware += "_site"
counterMeasurementAutoupdater += "_site"
}
if domain != runtime.GLOBAL_DOMAIN {
measurementGlobal += "_domain"
counterMeasurementModel += "_domain"
counterMeasurementFirmware += "_domain"
counterMeasurementAutoupdater += "_domain"
}
p := influxdb.NewPoint(measurementGlobal,
conn.config.Tags(),
map[string]interface{}{
"nodes": stats.Nodes,
"gateways": stats.Gateways,
"clients.total": stats.Clients,
"clients.wifi": stats.ClientsWifi,
"clients.wifi24": stats.ClientsWifi24,
"clients.wifi5": stats.ClientsWifi5,
"clients.owe": stats.ClientsOwe,
"clients.owe24": stats.ClientsOwe24,
"clients.owe5": stats.ClientsOwe5,
},
time)

if site != runtime.GLOBAL_SITE {
p = p.AddTag("site", site)
}
if domain != runtime.GLOBAL_DOMAIN {
p = p.AddTag("domain", domain)
}
conn.writeAPI[MeasurementGlobal].WritePoint(p)

conn.addCounterMap(CounterMeasurementModel, counterMeasurementModel, stats.Models, time, site, domain)
conn.addCounterMap(CounterMeasurementFirmware, counterMeasurementFirmware, stats.Firmwares, time, site, domain)
conn.addCounterMap(CounterMeasurementAutoupdater, counterMeasurementAutoupdater, stats.Autoupdater, time, site, domain)
}

// Saves the values of a CounterMap in the database.
// The key are used as 'value' tag.
// The value is used as 'counter' field.
func (conn *Connection) addCounterMap(writeName, name string, m runtime.CounterMap, t time.Time, site string, domain string) {
writeAPI, ok := conn.writeAPI[writeName]
if !ok {
log.WithField("writeName", writeName).Panic("no writeAPI found for countermap")
}
for key, count := range m {
p := influxdb.NewPoint("stat",
conn.config.Tags(),
map[string]interface{}{
"count": count,
},
t).
AddTag("site", site).
AddTag("domain", domain).
AddTag("value", key)
writeAPI.WritePoint(p)
}
}
30 changes: 30 additions & 0 deletions database/influxdb2/link.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package influxdb

import (
"time"

"yanic/runtime"

influxdb "github.com/influxdata/influxdb-client-go/v2"
)

// InsertLink adds a link data point
func (conn *Connection) InsertLink(link *runtime.Link, t time.Time) {
p := influxdb.NewPoint(MeasurementLink,
conn.config.Tags(),
map[string]interface{}{
"tq": link.TQ * 100,
},
t).
AddTag("source.id", link.SourceID).
AddTag("source.addr", link.SourceAddress).
AddTag("target.id", link.TargetID).
AddTag("target.addr", link.TargetAddress)
if link.SourceHostname != "" {
p.AddTag("source.hostname", link.SourceHostname)
}
if link.TargetHostname != "" {
p.AddTag("target.hostname", link.TargetHostname)
}
conn.writeAPI[MeasurementLink].WritePoint(p)
}
Loading

0 comments on commit 0dcd55a

Please sign in to comment.