Skip to content

Commit

Permalink
[monitoring] Avoided if statements #274
Browse files Browse the repository at this point in the history
Fixes #274
  • Loading branch information
praptisharma28 committed Jun 29, 2024
1 parent cfc9b58 commit 61300a6
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 217 deletions.
4 changes: 1 addition & 3 deletions openwisp_monitoring/db/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from .backends import timeseries_db

chart_query = timeseries_db.queries.chart_query
default_chart_query = timeseries_db.queries.default_chart_query
device_data_query = timeseries_db.queries.device_data_query

__all__ = ['timeseries_db', 'chart_query', 'default_chart_query', 'device_data_query']
__all__ = ['timeseries_db', 'chart_query']
61 changes: 15 additions & 46 deletions openwisp_monitoring/db/backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,36 +9,19 @@

TIMESERIES_DB = getattr(settings, 'TIMESERIES_DATABASE', None)
if not TIMESERIES_DB:
INFLUXDB_BACKEND = getattr(settings, 'INFLUXDB_BACKEND', 'openwisp_monitoring.db.backends.influxdb')

if INFLUXDB_BACKEND == 'openwisp_monitoring.db.backends.influxdb':
# InfluxDB 1.x configuration
TIMESERIES_DB = {
'BACKEND': INFLUXDB_BACKEND,
'USER': getattr(settings, 'INFLUXDB_USER', 'openwisp'),
'PASSWORD': getattr(settings, 'INFLUXDB_PASSWORD', 'openwisp'),
'NAME': getattr(settings, 'INFLUXDB_DATABASE', 'openwisp2'),
'HOST': getattr(settings, 'INFLUXDB_HOST', 'localhost'),
'PORT': getattr(settings, 'INFLUXDB_PORT', '8086'),
}
elif INFLUXDB_BACKEND == 'openwisp_monitoring.db.backends.influxdb2':
# InfluxDB 2.x configuration
TIMESERIES_DB = {
'BACKEND': INFLUXDB_BACKEND,
'TOKEN': getattr(settings, 'INFLUXDB_TOKEN', 'dltiEmsmMKU__9SoBE0ingFdMTS3UksrESwIQDNtW_3WOgn8bQGdyYzPcx_aDtvZkqvR8RbMkwVVlzUJxpm62w=='),
'ORG': getattr(settings, 'INFLUXDB_ORG', 'myorg'),
'BUCKET': getattr(settings, 'INFLUXDB_BUCKET', 'mybucket'),
'HOST': getattr(settings, 'INFLUXDB_HOST', 'localhost'),
'PORT': getattr(settings, 'INFLUXDB_PORT', '8086'),
}
else:
logger.warning('Invalid INFLUXDB_BACKEND setting. Please check the documentation.')
TIMESERIES_DB = {
'BACKEND': 'openwisp_monitoring.db.backends.influxdb',
'USER': getattr(settings, 'INFLUXDB_USER', 'openwisp'),
'PASSWORD': getattr(settings, 'INFLUXDB_PASSWORD', 'openwisp'),
'NAME': getattr(settings, 'INFLUXDB_DATABASE', 'openwisp2'),
'HOST': getattr(settings, 'INFLUXDB_HOST', 'localhost'),
'PORT': getattr(settings, 'INFLUXDB_PORT', '8086'),
}
logger.warning(
'The previous method to define Timeseries Database has been deprecated. Please refer to the docs:\n'
'https://github.com/openwisp/openwisp-monitoring#setup-integrate-in-an-existing-django-project'
)

if INFLUXDB_BACKEND == 'openwisp_monitoring.db.backends.influxdb':
logger.warning(
'The previous method to define Timeseries Database has been deprecated. Please refer to the docs:\n'
'https://github.com/openwisp/openwisp-monitoring#setup-integrate-in-an-existing-django-project'
)

def load_backend_module(backend_name=TIMESERIES_DB['BACKEND'], module=None):
"""
Expand All @@ -47,8 +30,7 @@ def load_backend_module(backend_name=TIMESERIES_DB['BACKEND'], module=None):
"""
try:
assert 'BACKEND' in TIMESERIES_DB, 'BACKEND'
is_influxdb2 = '2' in TIMESERIES_DB['BACKEND']
if is_influxdb2:
if 'BACKEND' in TIMESERIES_DB and '2' in TIMESERIES_DB['BACKEND']:
# InfluxDB 2.x specific checks
assert 'TOKEN' in TIMESERIES_DB, 'TOKEN'
assert 'ORG' in TIMESERIES_DB, 'ORG'
Expand All @@ -58,8 +40,6 @@ def load_backend_module(backend_name=TIMESERIES_DB['BACKEND'], module=None):
assert 'USER' in TIMESERIES_DB, 'USER'
assert 'PASSWORD' in TIMESERIES_DB, 'PASSWORD'
assert 'NAME' in TIMESERIES_DB, 'NAME'
assert 'HOST' in TIMESERIES_DB, 'HOST'
assert 'PORT' in TIMESERIES_DB, 'PORT'
if module:
return import_module(f'{backend_name}.{module}')
else:
Expand All @@ -82,18 +62,7 @@ def load_backend_module(backend_name=TIMESERIES_DB['BACKEND'], module=None):
"Try using 'openwisp_monitoring.db.backends.XXX', where XXX is one of:\n"
f"{builtin_backends}"
) from e
else:
raise e


if '2' in TIMESERIES_DB['BACKEND']:
timeseries_db = load_backend_module(module='client').DatabaseClient(
bucket=TIMESERIES_DB['BUCKET'],
org=TIMESERIES_DB['ORG'],
token=TIMESERIES_DB['TOKEN'],
url=f"http://{TIMESERIES_DB['HOST']}:{TIMESERIES_DB['PORT']}",
)
timeseries_db.queries = load_backend_module(TIMESERIES_DB['BACKEND'], module='queries')
else:
timeseries_db = load_backend_module(module='client').DatabaseClient()
timeseries_db.queries = load_backend_module(module='queries')
timeseries_db = load_backend_module(module='client').DatabaseClient()
timeseries_db.queries = load_backend_module(module='queries')
43 changes: 38 additions & 5 deletions openwisp_monitoring/db/backends/influxdb/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ class DatabaseClient(object):
backend_name = 'influxdb'

def __init__(self, db_name=None):
self._db = None
self.db_name = db_name or TIMESERIES_DB['NAME']
self.client_error = InfluxDBClientError

Expand Down Expand Up @@ -255,7 +254,7 @@ def read(self, key, fields, tags, **kwargs):
q = f'{q} LIMIT {limit}'
return list(self.query(q, precision='s').get_points())

def get_list_query(self, query, precision='s'):
def get_list_query(self, query, precision='s', **kwargs):
result = self.query(query, precision=precision)
if not len(result.keys()) or result.keys()[0][1] is None:
return list(result.get_points())
Expand Down Expand Up @@ -426,16 +425,23 @@ def __transform_field(self, field, function, operation=None):

def _get_top_fields(
self,
default_query,
query,
params,
chart_type,
group_map,
number,
time,
timezone=settings.TIME_ZONE,
get_fields=True,
):
"""
Returns top fields if ``get_fields`` set to ``True`` (default)
else it returns points containing the top fields.
"""
q = default_query.replace('{field_name}', '{fields}')
q = self.get_query(
query=query,
query=q,
params=params,
chart_type=chart_type,
group_map=group_map,
Expand All @@ -444,7 +450,7 @@ def _get_top_fields(
time=time,
timezone=timezone,
)
res = list(self.query(q, precision='s').get_points())
res = self.get_list_query(q)
if not res:
return []
res = res[0]
Expand All @@ -454,4 +460,31 @@ def _get_top_fields(
keys = list(sorted_dict.keys())
keys.reverse()
top = keys[0:number]
return [item.replace('sum_', '') for item in top]
top_fields = [item.replace('sum_', '') for item in top]
if get_fields:
return top_fields
query = self.get_query(
query=query,
params=params,
chart_type=chart_type,
group_map=group_map,
summary=True,
fields=top_fields,
time=time,
timezone=timezone,
)
return self.get_list_query(query)

def default_chart_query(self, tags):
q = "SELECT {field_name} FROM {key} WHERE time >= '{time}'"
if tags:
q += " AND content_type = '{content_type}' AND object_id = '{object_id}'"
return q

def _device_data(self, key, tags, rp, **kwargs):
""" returns last snapshot of ``device_data`` """
query = (
f"SELECT data FROM {rp}.{key} WHERE pk = '{tags['pk']}' "
"ORDER BY time DESC LIMIT 1"
)
return self.get_list_query(query, precision=None)
9 changes: 0 additions & 9 deletions openwisp_monitoring/db/backends/influxdb/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,3 @@
)
},
}

default_chart_query = [
"SELECT {field_name} FROM {key} WHERE time >= '{time}' {end_date}",
" AND content_type = '{content_type}' AND object_id = '{object_id}'",
]

device_data_query = (
"SELECT data FROM {0}.{1} WHERE pk = '{2}' " "ORDER BY time DESC LIMIT 1"
)
44 changes: 4 additions & 40 deletions openwisp_monitoring/db/backends/influxdb2/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def __init__(self, bucket=None, org=None, token=None, url=None):
self.bucket = bucket or TIMESERIES_DB['BUCKET']
self.org = org or TIMESERIES_DB['ORG']
self.token = token or TIMESERIES_DB['TOKEN']
self.url = url
self.url = url or f'http://{TIMESERIES_DB["HOST"]}:{TIMESERIES_DB["PORT"]}'
self.client = InfluxDBClient(url=self.url, token=self.token, org=self.org)
self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
self.query_api = self.client.query_api()
Expand Down Expand Up @@ -85,63 +85,28 @@ def _get_timestamp(self, timestamp=None):

def write(self, name, values, **kwargs):
timestamp = self._get_timestamp(timestamp=kwargs.get('timestamp'))
try:
tags = kwargs.get('tags', {})
if 'content_type' in kwargs:
tags['content_type'] = kwargs['content_type']
if 'object_id' in kwargs:
tags['object_id'] = kwargs['object_id']
try:
point = {
'measurement': name,
'tags': tags,
'tags': kwargs.get('tags'),
'fields': values,
'time': timestamp,
}
# import pdb; pdb.set_trace()
print(f"Writing point to InfluxDB: {point}")
self.write_api.write(bucket=self.bucket, org=self.org, record=point)
print(f"Successfully wrote point to bucket {self.bucket}")
except Exception as e:
print(f"Error writing to InfluxDB: {e}")

def batch_write(self, metric_data):
print(f"Batch writing to InfluxDB - Data: {metric_data}")
points = []
for data in metric_data:
timestamp = self._get_timestamp(timestamp=data.get('timestamp'))
point = Point(data.get('name')).tag(**data.get('tags', {})).field(**data.get('values')).time(timestamp, WritePrecision.NS)
points.append(point)

points.append(point)
try:
self.write_api.write(bucket=self.bucket, org=self.org, record=points)
logger.debug(f'Written batch of {len(points)} points to bucket {self.bucket}')
except Exception as e:
logger.error(f"Error writing batch to InfluxDB: {e}")

# def query(self, query):
# print(f"Executing query: {query}")
# try:
# tables = self.query_api.query(query)
# print(f"Query result: {tables}")
# result = []
# for table in tables:
# for record in table.records:
# record_dict = {
# 'time': record.get_time(),
# 'measurement': record.get_measurement(),
# 'field': record.get_field(),
# 'value': record.get_value()
# }
# result.append(record_dict)
# print(f"Record: {record_dict}")
# print(f"Query result: {result}")
# if not result:
# print("Query returned no data")
# return result
# except Exception as e:
# logger.error(f"Error querying InfluxDB: {e}")
# print(f"Error querying InfluxDB: {e}")
# return []
def _format_date(self, date_str):
if date_str is None or date_str == 'now()':
return date_str
Expand Down Expand Up @@ -193,7 +158,6 @@ def query(self, query):
return result
except Exception as e:
logger.error(f"Error executing query: {e}")
return None

def read(self, measurement, fields, tags, **kwargs):
extra_fields = kwargs.get('extra_fields')
Expand Down
26 changes: 8 additions & 18 deletions openwisp_monitoring/device/base/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from openwisp_monitoring.device.settings import get_critical_device_metrics
from openwisp_utils.base import TimeStampedEditableModel

from ...db import device_data_query, timeseries_db
from ...db import timeseries_db
from ...monitoring.signals import threshold_crossed
from ...monitoring.tasks import _timeseries_write
from ...settings import CACHE_TIMEOUT
Expand Down Expand Up @@ -156,22 +156,12 @@ def data(self):
"""
if self.__data:
return self.__data

if settings.TIMESERIES_DATABASE['BACKEND'] == 'openwisp_monitoring.db.backends.influxdb2':
# InfluxDB 2.x query
q = device_data_query.format(
bucket=settings.TIMESERIES_DATABASE['BUCKET'],
measurement=self.__key,
object_id=self.pk
)
else:
# InfluxDB 1.x query (kept for backward compatibility)
q = "SELECT data FROM {0}.{1} WHERE pk = '{2}' ORDER BY time DESC LIMIT 1".format(SHORT_RP, self.__key, self.pk)

cache_key = get_device_cache_key(device=self, context='current-data')
points = cache.get(cache_key)
if not points:
points = timeseries_db.get_list_query(q, precision=None)
points = timeseries_db._device_data(
rp=SHORT_RP, tags={'pk': self.pk}, key=self.__key, fields='data'
)
if not points:
return None
self.data_timestamp = points[0]['time']
Expand Down Expand Up @@ -391,11 +381,11 @@ def update_status(self, value):
self.full_clean()
self.save()
# clear device management_ip when device is offline
# if self.status == 'critical' and app_settings.AUTO_CLEAR_MANAGEMENT_IP:
# self.device.management_ip = None
# self.device.save(update_fields=['management_ip'])
if self.status == '' and app_settings.AUTO_CLEAR_MANAGEMENT_IP:
self.device.management_ip = None
self.device.save(update_fields=['management_ip'])

# health_status_changed.send(sender=self.__class__, instance=self, status=value)
health_status_changed.send(sender=self.__class__, instance=self, status=value)

@property
def related_metrics(self):
Expand Down
Loading

0 comments on commit 61300a6

Please sign in to comment.