diff --git a/openwisp_monitoring/db/__init__.py b/openwisp_monitoring/db/__init__.py index 063d2d8f..64510ebc 100644 --- a/openwisp_monitoring/db/__init__.py +++ b/openwisp_monitoring/db/__init__.py @@ -1,7 +1,5 @@ from .backends import timeseries_db chart_query = timeseries_db.queries.chart_query -default_chart_query = timeseries_db.queries.default_chart_query -device_data_query = timeseries_db.queries.device_data_query -__all__ = ['timeseries_db', 'chart_query', 'default_chart_query', 'device_data_query'] +__all__ = ['timeseries_db', 'chart_query'] diff --git a/openwisp_monitoring/db/backends/__init__.py b/openwisp_monitoring/db/backends/__init__.py index be0cd843..518c469e 100644 --- a/openwisp_monitoring/db/backends/__init__.py +++ b/openwisp_monitoring/db/backends/__init__.py @@ -9,36 +9,19 @@ TIMESERIES_DB = getattr(settings, 'TIMESERIES_DATABASE', None) if not TIMESERIES_DB: - INFLUXDB_BACKEND = getattr(settings, 'INFLUXDB_BACKEND', 'openwisp_monitoring.db.backends.influxdb') - - if INFLUXDB_BACKEND == 'openwisp_monitoring.db.backends.influxdb': - # InfluxDB 1.x configuration - TIMESERIES_DB = { - 'BACKEND': INFLUXDB_BACKEND, - 'USER': getattr(settings, 'INFLUXDB_USER', 'openwisp'), - 'PASSWORD': getattr(settings, 'INFLUXDB_PASSWORD', 'openwisp'), - 'NAME': getattr(settings, 'INFLUXDB_DATABASE', 'openwisp2'), - 'HOST': getattr(settings, 'INFLUXDB_HOST', 'localhost'), - 'PORT': getattr(settings, 'INFLUXDB_PORT', '8086'), - } - elif INFLUXDB_BACKEND == 'openwisp_monitoring.db.backends.influxdb2': - # InfluxDB 2.x configuration - TIMESERIES_DB = { - 'BACKEND': INFLUXDB_BACKEND, - 'TOKEN': getattr(settings, 'INFLUXDB_TOKEN', 'dltiEmsmMKU__9SoBE0ingFdMTS3UksrESwIQDNtW_3WOgn8bQGdyYzPcx_aDtvZkqvR8RbMkwVVlzUJxpm62w=='), - 'ORG': getattr(settings, 'INFLUXDB_ORG', 'myorg'), - 'BUCKET': getattr(settings, 'INFLUXDB_BUCKET', 'mybucket'), - 'HOST': getattr(settings, 'INFLUXDB_HOST', 'localhost'), - 'PORT': getattr(settings, 'INFLUXDB_PORT', '8086'), - } - else: - logger.warning('Invalid INFLUXDB_BACKEND setting. Please check the documentation.') + TIMESERIES_DB = { + 'BACKEND': 'openwisp_monitoring.db.backends.influxdb', + 'USER': getattr(settings, 'INFLUXDB_USER', 'openwisp'), + 'PASSWORD': getattr(settings, 'INFLUXDB_PASSWORD', 'openwisp'), + 'NAME': getattr(settings, 'INFLUXDB_DATABASE', 'openwisp2'), + 'HOST': getattr(settings, 'INFLUXDB_HOST', 'localhost'), + 'PORT': getattr(settings, 'INFLUXDB_PORT', '8086'), + } + logger.warning( + 'The previous method to define Timeseries Database has been deprecated. Please refer to the docs:\n' + 'https://github.com/openwisp/openwisp-monitoring#setup-integrate-in-an-existing-django-project' + ) - if INFLUXDB_BACKEND == 'openwisp_monitoring.db.backends.influxdb': - logger.warning( - 'The previous method to define Timeseries Database has been deprecated. Please refer to the docs:\n' - 'https://github.com/openwisp/openwisp-monitoring#setup-integrate-in-an-existing-django-project' - ) def load_backend_module(backend_name=TIMESERIES_DB['BACKEND'], module=None): """ @@ -47,8 +30,7 @@ def load_backend_module(backend_name=TIMESERIES_DB['BACKEND'], module=None): """ try: assert 'BACKEND' in TIMESERIES_DB, 'BACKEND' - is_influxdb2 = '2' in TIMESERIES_DB['BACKEND'] - if is_influxdb2: + if 'BACKEND' in TIMESERIES_DB and '2' in TIMESERIES_DB['BACKEND']: # InfluxDB 2.x specific checks assert 'TOKEN' in TIMESERIES_DB, 'TOKEN' assert 'ORG' in TIMESERIES_DB, 'ORG' @@ -58,8 +40,6 @@ def load_backend_module(backend_name=TIMESERIES_DB['BACKEND'], module=None): assert 'USER' in TIMESERIES_DB, 'USER' assert 'PASSWORD' in TIMESERIES_DB, 'PASSWORD' assert 'NAME' in TIMESERIES_DB, 'NAME' - assert 'HOST' in TIMESERIES_DB, 'HOST' - assert 'PORT' in TIMESERIES_DB, 'PORT' if module: return import_module(f'{backend_name}.{module}') else: @@ -82,18 +62,7 @@ def load_backend_module(backend_name=TIMESERIES_DB['BACKEND'], module=None): "Try using 'openwisp_monitoring.db.backends.XXX', where XXX is one of:\n" f"{builtin_backends}" ) from e - else: - raise e -if '2' in TIMESERIES_DB['BACKEND']: - timeseries_db = load_backend_module(module='client').DatabaseClient( - bucket=TIMESERIES_DB['BUCKET'], - org=TIMESERIES_DB['ORG'], - token=TIMESERIES_DB['TOKEN'], - url=f"http://{TIMESERIES_DB['HOST']}:{TIMESERIES_DB['PORT']}", - ) - timeseries_db.queries = load_backend_module(TIMESERIES_DB['BACKEND'], module='queries') -else: - timeseries_db = load_backend_module(module='client').DatabaseClient() - timeseries_db.queries = load_backend_module(module='queries') \ No newline at end of file +timeseries_db = load_backend_module(module='client').DatabaseClient() +timeseries_db.queries = load_backend_module(module='queries') diff --git a/openwisp_monitoring/db/backends/influxdb/client.py b/openwisp_monitoring/db/backends/influxdb/client.py index 906769a0..583ce1fa 100644 --- a/openwisp_monitoring/db/backends/influxdb/client.py +++ b/openwisp_monitoring/db/backends/influxdb/client.py @@ -56,7 +56,6 @@ class DatabaseClient(object): backend_name = 'influxdb' def __init__(self, db_name=None): - self._db = None self.db_name = db_name or TIMESERIES_DB['NAME'] self.client_error = InfluxDBClientError @@ -255,7 +254,7 @@ def read(self, key, fields, tags, **kwargs): q = f'{q} LIMIT {limit}' return list(self.query(q, precision='s').get_points()) - def get_list_query(self, query, precision='s'): + def get_list_query(self, query, precision='s', **kwargs): result = self.query(query, precision=precision) if not len(result.keys()) or result.keys()[0][1] is None: return list(result.get_points()) @@ -426,6 +425,7 @@ def __transform_field(self, field, function, operation=None): def _get_top_fields( self, + default_query, query, params, chart_type, @@ -433,9 +433,15 @@ def _get_top_fields( number, time, timezone=settings.TIME_ZONE, + get_fields=True, ): + """ + Returns top fields if ``get_fields`` set to ``True`` (default) + else it returns points containing the top fields. + """ + q = default_query.replace('{field_name}', '{fields}') q = self.get_query( - query=query, + query=q, params=params, chart_type=chart_type, group_map=group_map, @@ -444,7 +450,7 @@ def _get_top_fields( time=time, timezone=timezone, ) - res = list(self.query(q, precision='s').get_points()) + res = self.get_list_query(q) if not res: return [] res = res[0] @@ -454,4 +460,31 @@ def _get_top_fields( keys = list(sorted_dict.keys()) keys.reverse() top = keys[0:number] - return [item.replace('sum_', '') for item in top] + top_fields = [item.replace('sum_', '') for item in top] + if get_fields: + return top_fields + query = self.get_query( + query=query, + params=params, + chart_type=chart_type, + group_map=group_map, + summary=True, + fields=top_fields, + time=time, + timezone=timezone, + ) + return self.get_list_query(query) + + def default_chart_query(self, tags): + q = "SELECT {field_name} FROM {key} WHERE time >= '{time}'" + if tags: + q += " AND content_type = '{content_type}' AND object_id = '{object_id}'" + return q + + def _device_data(self, key, tags, rp, **kwargs): + """ returns last snapshot of ``device_data`` """ + query = ( + f"SELECT data FROM {rp}.{key} WHERE pk = '{tags['pk']}' " + "ORDER BY time DESC LIMIT 1" + ) + return self.get_list_query(query, precision=None) diff --git a/openwisp_monitoring/db/backends/influxdb/queries.py b/openwisp_monitoring/db/backends/influxdb/queries.py index f3f64aa2..185677a9 100644 --- a/openwisp_monitoring/db/backends/influxdb/queries.py +++ b/openwisp_monitoring/db/backends/influxdb/queries.py @@ -144,12 +144,3 @@ ) }, } - -default_chart_query = [ - "SELECT {field_name} FROM {key} WHERE time >= '{time}' {end_date}", - " AND content_type = '{content_type}' AND object_id = '{object_id}'", -] - -device_data_query = ( - "SELECT data FROM {0}.{1} WHERE pk = '{2}' " "ORDER BY time DESC LIMIT 1" -) diff --git a/openwisp_monitoring/db/backends/influxdb2/client.py b/openwisp_monitoring/db/backends/influxdb2/client.py index 9af567a5..256c12d9 100644 --- a/openwisp_monitoring/db/backends/influxdb2/client.py +++ b/openwisp_monitoring/db/backends/influxdb2/client.py @@ -31,7 +31,7 @@ def __init__(self, bucket=None, org=None, token=None, url=None): self.bucket = bucket or TIMESERIES_DB['BUCKET'] self.org = org or TIMESERIES_DB['ORG'] self.token = token or TIMESERIES_DB['TOKEN'] - self.url = url + self.url = url or f'http://{TIMESERIES_DB["HOST"]}:{TIMESERIES_DB["PORT"]}' self.client = InfluxDBClient(url=self.url, token=self.token, org=self.org) self.write_api = self.client.write_api(write_options=SYNCHRONOUS) self.query_api = self.client.query_api() @@ -85,63 +85,28 @@ def _get_timestamp(self, timestamp=None): def write(self, name, values, **kwargs): timestamp = self._get_timestamp(timestamp=kwargs.get('timestamp')) - try: - tags = kwargs.get('tags', {}) - if 'content_type' in kwargs: - tags['content_type'] = kwargs['content_type'] - if 'object_id' in kwargs: - tags['object_id'] = kwargs['object_id'] + try: point = { 'measurement': name, - 'tags': tags, + 'tags': kwargs.get('tags'), 'fields': values, 'time': timestamp, } - # import pdb; pdb.set_trace() - print(f"Writing point to InfluxDB: {point}") self.write_api.write(bucket=self.bucket, org=self.org, record=point) - print(f"Successfully wrote point to bucket {self.bucket}") except Exception as e: print(f"Error writing to InfluxDB: {e}") def batch_write(self, metric_data): - print(f"Batch writing to InfluxDB - Data: {metric_data}") points = [] for data in metric_data: timestamp = self._get_timestamp(timestamp=data.get('timestamp')) point = Point(data.get('name')).tag(**data.get('tags', {})).field(**data.get('values')).time(timestamp, WritePrecision.NS) - points.append(point) - + points.append(point) try: self.write_api.write(bucket=self.bucket, org=self.org, record=points) - logger.debug(f'Written batch of {len(points)} points to bucket {self.bucket}') except Exception as e: logger.error(f"Error writing batch to InfluxDB: {e}") - # def query(self, query): - # print(f"Executing query: {query}") - # try: - # tables = self.query_api.query(query) - # print(f"Query result: {tables}") - # result = [] - # for table in tables: - # for record in table.records: - # record_dict = { - # 'time': record.get_time(), - # 'measurement': record.get_measurement(), - # 'field': record.get_field(), - # 'value': record.get_value() - # } - # result.append(record_dict) - # print(f"Record: {record_dict}") - # print(f"Query result: {result}") - # if not result: - # print("Query returned no data") - # return result - # except Exception as e: - # logger.error(f"Error querying InfluxDB: {e}") - # print(f"Error querying InfluxDB: {e}") - # return [] def _format_date(self, date_str): if date_str is None or date_str == 'now()': return date_str @@ -193,7 +158,6 @@ def query(self, query): return result except Exception as e: logger.error(f"Error executing query: {e}") - return None def read(self, measurement, fields, tags, **kwargs): extra_fields = kwargs.get('extra_fields') diff --git a/openwisp_monitoring/device/base/models.py b/openwisp_monitoring/device/base/models.py index a9a5ced7..cdb9ba8e 100644 --- a/openwisp_monitoring/device/base/models.py +++ b/openwisp_monitoring/device/base/models.py @@ -26,7 +26,7 @@ from openwisp_monitoring.device.settings import get_critical_device_metrics from openwisp_utils.base import TimeStampedEditableModel -from ...db import device_data_query, timeseries_db +from ...db import timeseries_db from ...monitoring.signals import threshold_crossed from ...monitoring.tasks import _timeseries_write from ...settings import CACHE_TIMEOUT @@ -156,22 +156,12 @@ def data(self): """ if self.__data: return self.__data - - if settings.TIMESERIES_DATABASE['BACKEND'] == 'openwisp_monitoring.db.backends.influxdb2': - # InfluxDB 2.x query - q = device_data_query.format( - bucket=settings.TIMESERIES_DATABASE['BUCKET'], - measurement=self.__key, - object_id=self.pk - ) - else: - # InfluxDB 1.x query (kept for backward compatibility) - q = "SELECT data FROM {0}.{1} WHERE pk = '{2}' ORDER BY time DESC LIMIT 1".format(SHORT_RP, self.__key, self.pk) - cache_key = get_device_cache_key(device=self, context='current-data') points = cache.get(cache_key) if not points: - points = timeseries_db.get_list_query(q, precision=None) + points = timeseries_db._device_data( + rp=SHORT_RP, tags={'pk': self.pk}, key=self.__key, fields='data' + ) if not points: return None self.data_timestamp = points[0]['time'] @@ -391,11 +381,11 @@ def update_status(self, value): self.full_clean() self.save() # clear device management_ip when device is offline - # if self.status == 'critical' and app_settings.AUTO_CLEAR_MANAGEMENT_IP: - # self.device.management_ip = None - # self.device.save(update_fields=['management_ip']) + if self.status == '' and app_settings.AUTO_CLEAR_MANAGEMENT_IP: + self.device.management_ip = None + self.device.save(update_fields=['management_ip']) - # health_status_changed.send(sender=self.__class__, instance=self, status=value) + health_status_changed.send(sender=self.__class__, instance=self, status=value) @property def related_metrics(self): diff --git a/openwisp_monitoring/monitoring/base/models.py b/openwisp_monitoring/monitoring/base/models.py index 89c28915..f3cd7141 100644 --- a/openwisp_monitoring/monitoring/base/models.py +++ b/openwisp_monitoring/monitoring/base/models.py @@ -24,7 +24,7 @@ from openwisp_monitoring.monitoring.utils import clean_timeseries_data_key from openwisp_utils.base import TimeStampedEditableModel -from ...db import default_chart_query, timeseries_db +from ...db import timeseries_db from ...settings import CACHE_TIMEOUT, DEFAULT_CHART_TIME from ..configuration import ( CHART_CONFIGURATION_CHOICES, @@ -421,14 +421,9 @@ def write( current=current, ) pre_metric_write.send(**signal_kwargs) - if time is None: - timestamp = timezone.now() - elif isinstance(time, str): - timestamp = parse_date(time) - else: - timestamp = time - if timezone.is_naive(timestamp): - timestamp = timezone.make_aware(timestamp) + timestamp = time or timezone.now() + if isinstance(timestamp, str): + timestamp = parse_date(timestamp) options = dict( tags=self.tags, timestamp=timestamp.isoformat(), @@ -472,11 +467,6 @@ def batch_write(cls, raw_data): for metric, kwargs in raw_data: try: write_data.append(metric.write(**kwargs, write=False)) - if kwargs.get('check', True): - check_value = kwargs['value'] - if metric.alert_on_related_field and kwargs.get('extra_values'): - check_value = kwargs['extra_values'][metric.alert_field] - metric.check_threshold(check_value, kwargs.get('time'), kwargs.get('retention_policy'), kwargs.get('send_alert', True)) except ValueError as error: error_dict[metric.key] = str(error) _timeseries_batch_write(write_data) @@ -486,7 +476,7 @@ def batch_write(cls, raw_data): def read(self, **kwargs): """reads timeseries data""" return timeseries_db.read( - measurement=self.key, fields=self.field_name, tags=self.tags, **kwargs + key=self.key, fields=self.field_name, tags=self.tags, **kwargs ) def _notify_users(self, notification_type, alert_settings): @@ -627,10 +617,8 @@ def top_fields(self): @property def _default_query(self): - q = default_chart_query[0] - if self.metric.object_id: - q += default_chart_query[1] - return q + tags = True if self.metric.object_id else False + return timeseries_db.default_chart_query(tags) @classmethod def _get_group_map(cls, time=None): @@ -666,16 +654,6 @@ def _get_group_map(cls, time=None): group = '7d' custom_group_map.update({time: group}) return custom_group_map - - def _format_date(self, date_str): - if date_str is None or date_str == 'now()': - return date_str - try: - date = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S') - return date.strftime('%Y-%m-%dT%H:%M:%SZ') - except ValueError: - # If the date_str is not in the expected format, return it as is - return date_str def get_query( self, @@ -695,13 +673,8 @@ def get_query( params = self._get_query_params(time, start_date, end_date) params.update(additional_params) params.update({'start_date': start_date, 'end_date': end_date}) - params.update({ - 'start_date': self._format_date(start_date) if start_date else None, - 'end_date': self._format_date(end_date) if end_date else None - }) if not params.get('organization_id') and self.config_dict.get('__all__', False): params['organization_id'] = ['__all__'] - params['measurement'] = params.get('measurement') or params.get('key') return timeseries_db.get_query( self.type, params, @@ -718,10 +691,10 @@ def get_top_fields(self, number): Returns list of top ``number`` of fields (highest sum) of a measurement in the specified time range (descending order). """ - q = self._default_query.replace('{field_name}', '{fields}') params = self._get_query_params(self.DEFAULT_TIME) return timeseries_db._get_top_fields( - query=q, + default_query=self._default_query, + query=self.get_query(), chart_type=self.type, group_map=self._get_group_map(params['days']), number=number, @@ -732,7 +705,6 @@ def get_top_fields(self, number): def _get_query_params(self, time, start_date=None, end_date=None): m = self.metric params = dict( - measurement=m.key, field_name=m.field_name, key=m.key, time=self._get_time(time, start_date, end_date), @@ -780,26 +752,31 @@ def read( ): additional_query_kwargs = additional_query_kwargs or {} traces = {} - x = [] + if x_axys: + x = [] try: query_kwargs = dict( time=time, timezone=timezone, start_date=start_date, end_date=end_date ) query_kwargs.update(additional_query_kwargs) if self.top_fields: - fields = self.get_top_fields(self.top_fields) - data_query = self.get_query(fields=fields, **query_kwargs) - summary_query = self.get_query( - fields=fields, summary=True, **query_kwargs + points = summary = timeseries_db._get_top_fields( + default_query=self._default_query, + chart_type=self.type, + group_map=self.GROUP_MAP, + number=self.top_fields, + params=self._get_query_params(self.DEFAULT_TIME), + time=time, + query=self.query, + get_fields=False, ) else: data_query = self.get_query(**query_kwargs) summary_query = self.get_query(summary=True, **query_kwargs) - points = timeseries_db.get_list_query(data_query) - logging.debug(f"Data points: {points}") - logging.debug(f"Data query: {data_query}") - summary = timeseries_db.get_list_query(summary_query) - logging.debug(f"Summary query: {summary_query}") + points = timeseries_db.get_list_query(data_query, key=self.metric.key) + summary = timeseries_db.get_list_query( + summary_query, key=self.metric.key + ) except timeseries_db.client_error as e: logging.error(e, exc_info=True) raise e @@ -809,31 +786,31 @@ def read( logging.warning(f"Point missing time value: {point}") continue for key, value in point.items(): - if key in ['time', '_time']: + if key == 'time': continue traces.setdefault(key, []) if decimal_places and isinstance(value, (int, float)): value = self._round(value, decimal_places) traces[key].append(value) - if isinstance(time_value, str): - time = datetime.fromisoformat(time_value.rstrip('Z')).replace(tzinfo=utc).astimezone(tz(timezone)) - else: - time = datetime.fromtimestamp(time_value, tz=tz(timezone)) - formatted_time = time.strftime('%Y-%m-%d %H:%M') - x.append(formatted_time) + time = datetime.fromtimestamp(point['time'], tz=tz(timezone)).strftime( + '%Y-%m-%d %H:%M' + ) + if x_axys: + x.append(time) # prepare result to be returned # (transform chart data so its order is not random) result = {'traces': sorted(traces.items())} - result['x'] = x + if x_axys: + result['x'] = x # add summary if len(summary) > 0: result['summary'] = {} for key, value in summary[0].items(): - if key in ['time', '_time']: + if key == 'time': continue if not timeseries_db.validate_query(self.query): value = None - elif value is not None: + elif value: value = self._round(value, decimal_places) result['summary'][key] = value return result diff --git a/openwisp_monitoring/monitoring/migrations/__init__.py b/openwisp_monitoring/monitoring/migrations/__init__.py index 74784001..d8198ede 100644 --- a/openwisp_monitoring/monitoring/migrations/__init__.py +++ b/openwisp_monitoring/monitoring/migrations/__init__.py @@ -1,10 +1,7 @@ -from asyncio.log import logger - import swapper from django.contrib.auth.models import Permission from openwisp_controller.migrations import create_default_permissions, get_swapped_model -from django.db import transaction def assign_permissions_to_groups(apps, schema_editor): @@ -75,42 +72,30 @@ def create_general_metrics(apps, schema_editor): Chart = swapper.load_model('monitoring', 'Chart') Metric = swapper.load_model('monitoring', 'Metric') - # Temporarily disable the validation rules for the Chart model - original_full_clean = Chart.full_clean - - def disabled_full_clean(self): - pass - - Chart.full_clean = disabled_full_clean - - try: - with transaction.atomic(): - metric, created = Metric._get_or_create( - configuration='general_clients', - name='General Clients', - key='wifi_clients', - object_id=None, - content_type_id=None, - ) - if created: - chart = Chart(metric=metric, configuration='gen_wifi_clients') - logger.debug(f'Creating chart with configuration: {chart.configuration}') - chart.save() - - metric, created = Metric._get_or_create( - configuration='general_traffic', - name='General Traffic', - key='traffic', - object_id=None, - content_type_id=None, - ) - if created: - chart = Chart(metric=metric, configuration='general_traffic') - logger.debug(f'Creating chart with configuration: {chart.configuration}') - chart.save() - finally: - # Restore the original full_clean method - Chart.full_clean = original_full_clean + + metric, created = Metric._get_or_create( + configuration='general_clients', + name='General Clients', + key='wifi_clients', + object_id=None, + content_type_id=None, + ) + if created: + chart = Chart(metric=metric, configuration='gen_wifi_clients') + logger.debug(f'Creating chart with configuration: {chart.configuration}') + chart.save() + + metric, created = Metric._get_or_create( + configuration='general_traffic', + name='General Traffic', + key='traffic', + object_id=None, + content_type_id=None, + ) + if created: + chart = Chart(metric=metric, configuration='general_traffic') + logger.debug(f'Creating chart with configuration: {chart.configuration}') + chart.save() def delete_general_metrics(apps, schema_editor): Metric = apps.get_model('monitoring', 'Metric')