diff --git a/openwisp_monitoring/db/backends/influxdb2/client.py b/openwisp_monitoring/db/backends/influxdb2/client.py index 256c12d9..e0327a82 100644 --- a/openwisp_monitoring/db/backends/influxdb2/client.py +++ b/openwisp_monitoring/db/backends/influxdb2/client.py @@ -92,7 +92,9 @@ def write(self, name, values, **kwargs): 'fields': values, 'time': timestamp, } + print(f"Writing point to InfluxDB: {point}") self.write_api.write(bucket=self.bucket, org=self.org, record=point) + print("Successfully wrote point to InfluxDB") except Exception as e: print(f"Error writing to InfluxDB: {e}") @@ -117,114 +119,135 @@ def _format_date(self, date_str): # If the date_str is not in the expected format, return it as is return date_str - def get_query(self, chart_type, params, time, group_map, summary=False, fields=None, query=None, timezone=settings.TIME_ZONE): - print(f"get_query called with params: {params}") - measurement = params.get('measurement') or params.get('key') - if not measurement or measurement == 'None': - logger.error(f"Invalid or missing measurement in params: {params}") - return None - - start_date = self._format_date(params.get('start_date', f'-{time}')) - end_date = self._format_date(params.get('end_date', 'now()')) - content_type = params.get('content_type') - object_id = params.get('object_id') - - - window = group_map.get(time, '1h') - - flux_query = f''' - from(bucket: "{self.bucket}") - |> range(start: {start_date}, stop: {end_date}) - |> filter(fn: (r) => r["_measurement"] == "{measurement}") - ''' - - if content_type and object_id: - flux_query += f' |> filter(fn: (r) => r.content_type == "{content_type}" and r.object_id == "{object_id}")\n' - - if fields: - field_filters = ' or '.join([f'r["_field"] == "{field}"' for field in fields]) - flux_query += f' |> filter(fn: (r) => {field_filters})\n' - - flux_query += f' |> aggregateWindow(every: {window}, fn: mean, createEmpty: false)\n' - flux_query += ' |> yield(name: "mean")' - - print(f"Generated Flux query: {flux_query}") - return flux_query - def query(self, query): print(f"Executing query: {query}") try: result = self.query_api.query(query) + print(f"Query result: {result}") return result except Exception as e: + print(f"Error executing query: {e}") logger.error(f"Error executing query: {e}") - - def read(self, measurement, fields, tags, **kwargs): + return [] + + def _parse_query_result(self, result): + print("Parsing query result") + parsed_result = [] + for table in result: + for record in table.records: + parsed_record = { + 'time': record.get_time().isoformat(), + } + for key, value in record.values.items(): + if key not in ['_time', '_start', '_stop', '_measurement']: + parsed_record[key] = value + parsed_result.append(parsed_record) + print(f"Parsed result: {parsed_result}") + return parsed_result + + def read(self, key, fields, tags, **kwargs): extra_fields = kwargs.get('extra_fields') - since = kwargs.get('since', '-30d') + since = kwargs.get('since', '-30d') # Default to last 30 days if not specified order = kwargs.get('order') limit = kwargs.get('limit') + bucket = self.bucket - flux_query = f''' - from(bucket: "{self.bucket}") - |> range(start: {since}) - |> filter(fn: (r) => r._measurement == "{measurement}") - ''' - if fields and fields != '*': - field_filters = ' or '.join([f'r._field == "{field}"' for field in fields.split(', ')]) - flux_query += f' |> filter(fn: (r) => {field_filters})' - + # Start building the Flux query + flux_query = f'from(bucket:"{bucket}")' + + # Add time range + flux_query += f'\n |> range(start: {since})' + + # Filter by measurement (key) + flux_query += f'\n |> filter(fn: (r) => r["_measurement"] == "{key}")' + + # Filter by fields + if fields != '*': + if extra_fields and extra_fields != '*': + all_fields = [fields] + extra_fields if isinstance(extra_fields, list) else [fields, extra_fields] + field_filter = ' or '.join([f'r["_field"] == "{field}"' for field in all_fields]) + else: + field_filter = f'r["_field"] == "{fields}"' + flux_query += f'\n |> filter(fn: (r) => {field_filter})' + + # Filter by tags if tags: tag_filters = ' and '.join([f'r["{tag}"] == "{value}"' for tag, value in tags.items()]) - flux_query += f' |> filter(fn: (r) => {tag_filters})' - - flux_query += ''' - |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") - |> map(fn: (r) => ({r with _value: float(v: r._value)})) - |> keep(columns: ["_time", "_value", "_field", "content_type", "object_id"]) - |> rename(columns: {_time: "time"}) - ''' + flux_query += f'\n |> filter(fn: (r) => {tag_filters})' + # Add ordering if order: - if order == 'time': - flux_query += ' |> sort(columns: ["time"], desc: false)' - elif order == '-time': - flux_query += ' |> sort(columns: ["time"], desc: true)' + if order in ['time', '-time']: + desc = 'true' if order == '-time' else 'false' + flux_query += f'\n |> sort(columns: ["_time"], desc: {desc})' else: - raise ValueError(f'Invalid order "{order}" passed.\nYou may pass "time" / "-time" to get result sorted in ascending /descending order respectively.') - + raise self.client_error( + f'Invalid order "{order}" passed.\nYou may pass "time" / "-time" to get ' + 'result sorted in ascending /descending order respectively.' + ) + + # Add limit if limit: - flux_query += f' |> limit(n: {limit})' + flux_query += f'\n |> limit(n: {limit})' - return self.query(flux_query) + # Pivot the result to make it similar to InfluxDB 1.x output + flux_query += '\n |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' - def get_list_query(self, query, precision=None): - print(f"get_list_query called with query: {query}") - result = self.query(query) - result_points = [] - - if result is None: - print("Query returned None") - return result_points + # Execute the query + try: + result = self.query_api.query(flux_query) + return self._parse_read_result(result) + except Exception as e: + logger.error(f"Error executing read query: {e}") + return [] + def _parse_read_result(self, result): + parsed_result = [] for table in result: for record in table.records: - time = record.get_time() - if precision is not None: - # Truncate the time based on the specified precision - time = time.isoformat()[:precision] - else: - time = time.isoformat() - - values = {col: record.values.get(col) for col in record.values if col != '_time'} - values['time'] = time - values['_value'] = record.get_value() - values['_field'] = record.get_field() - result_points.append(values) - - print(f"get_list_query returned {len(result_points)} points") - print(f"Processed result points: {result_points}") - return result_points + parsed_record = { + 'time': record.get_time().isoformat(), + } + for key, value in record.values.items(): + if key not in ['_time', '_start', '_stop', '_measurement']: + parsed_record[key] = value + parsed_result.append(parsed_record) + return parsed_result + + def get_ping_data_query(self, bucket, start, stop, device_ids): + device_filter = ' or '.join([f'r["object_id"] == "{id}"' for id in device_ids]) + query = f''' + from(bucket: "{bucket}") + |> range(start: {start}, stop: {stop}) + |> filter(fn: (r) => r["_measurement"] == "ping") + |> filter(fn: (r) => r["_field"] == "loss" or r["_field"] == "reachable" or r["_field"] == "rtt_avg" or r["_field"] == "rtt_max" or r["_field"] == "rtt_min") + |> filter(fn: (r) => r["content_type"] == "config.device") + |> filter(fn: (r) => {device_filter}) + |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false) + |> yield(name: "mean") + ''' + return query + + def execute_query(self, query): + try: + result = self.query_api.query(query) + return self._parse_result(result) + except Exception as e: + logger.error(f"Error executing query: {e}") + return [] + + def _parse_result(self, result): + parsed_result = [] + for table in result: + for record in table.records: + parsed_record = { + 'time': record.get_time().isoformat(), + 'device_id': record.values.get('object_id'), + 'field': record.values.get('_field'), + 'value': record.values.get('_value') + } + parsed_result.append(parsed_record) + return parsed_result def delete_metric_data(self, key=None, tags=None): start = "1970-01-01T00:00:00Z" @@ -274,7 +297,17 @@ def _get_filter_query(self, field, items): filters.append(f'r["{field}"] == "{item}"') return f'|> filter(fn: (r) => {" or ".join(filters)})' - # def get_query(self, chart_type, params, time, group_map, summary=False, fields=None, query=None, timezone=settings.TIME_ZONE): + def get_query( + self, + chart_type, + params, + time, + group_map, + summary=False, + fields=None, + query=None, + timezone=settings.TIME_ZONE + ): bucket = self.bucket measurement = params.get('measurement') if not measurement or measurement == 'None': @@ -283,117 +316,49 @@ def _get_filter_query(self, field, items): start_date = params.get('start_date') end_date = params.get('end_date') + + # Set default values for start and end dates if they're None + if start_date is None: + start_date = f'-{time}' + if end_date is None: + end_date = 'now()' + content_type = params.get('content_type') object_id = params.get('object_id') - print(f"get_query called with params: {params}") - import pdb; pdb.set_trace() - def format_time(time_str): - if time_str: - try: - if isinstance(time_str, str): - # Try parsing as ISO format first - try: - dt = datetime.fromisoformat(time_str.replace('Z', '+00:00')) - except ValueError: - # If that fails, try parsing as a different format - dt = datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S') - else: - dt = time_str - return dt.strftime('%Y-%m-%dT%H:%M:%SZ') - except Exception as e: - print(f"Error parsing time: {e}") - return None - - start_date = format_time(start_date) if start_date else f'-{time}' - end_date = format_time(end_date) if end_date else 'now()' + field_name = params.get('field_name') or fields + + object_id_filter = f' and r.object_id == "{object_id}"' if object_id else "" flux_query = f''' from(bucket: "{bucket}") |> range(start: {start_date}, stop: {end_date}) |> filter(fn: (r) => r._measurement == "{measurement}") - |> filter(fn: (r) => r.content_type == "{content_type}" and r.object_id == "{object_id}") + |> filter(fn: (r) => r.content_type == "{content_type}"{object_id_filter}) ''' - if not summary: - window = group_map.get(time, '1h') - flux_query += f'|> aggregateWindow(every: {window}, fn: mean, createEmpty: false)' + if field_name: + if isinstance(field_name, (list, tuple)): + field_filter = ' or '.join([f'r._field == "{field}"' for field in field_name]) + else: + field_filter = f'r._field == "{field_name}"' + flux_query += f' |> filter(fn: (r) => {field_filter})\n' - flux_query += ''' - |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") - ''' + logger.debug(f"Time: {time}") + logger.debug(f"Group map: {group_map}") + window = group_map.get(time, '1h') + logger.debug(f"Window: {window}") - if summary: - flux_query += '|> last()' + if not summary: + flux_query += f' |> aggregateWindow(every: {window}, fn: mean, createEmpty: false)\n' + else: + flux_query += ' |> last()\n' - flux_query += '|> yield(name: "result")' + flux_query += ' |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")\n' + flux_query += ' |> yield(name: "result")' - print(f"Generated Flux query: {flux_query}") + logger.debug(f"Generated Flux query: {flux_query}") return flux_query - # def get_query( - # self, - # chart_type, - # params, - # time_range, - # group_map, - # summary=False, - # fields=None, - # query=None, - # timezone=settings.TIME_ZONE, - # ): - # flux_query = f'from(bucket: "{self.bucket}")' - - # def format_date(date): - # if date is None: - # return None - # if isinstance(date, str): - # try: - # dt = datetime.strptime(date, "%Y-%m-%d %H:%M:%S") - # return str(int(dt.timestamp())) - # except ValueError: - # return date - # if isinstance(date, datetime): - # return str(int(date.timestamp())) - # return str(date) - - # start_date = format_date(params.get('start_date')) - # end_date = format_date(params.get('end_date')) - - # if start_date: - # flux_query += f' |> range(start: {start_date}' - # else: - # flux_query += f' |> range(start: -{time_range}' - - # if end_date: - # flux_query += f', stop: {end_date})' - # else: - # flux_query += ')' - - # if 'key' in params: - # flux_query += f' |> filter(fn: (r) => r._measurement == "{params["key"]}")' - - # if fields and fields != '*': - # field_filters = ' or '.join([f'r._field == "{field.strip()}"' for field in fields.split(',')]) - # flux_query += f' |> filter(fn: (r) => {field_filters})' - - # if 'content_type' in params and 'object_id' in params: - # flux_query += f' |> filter(fn: (r) => r.content_type == "{params["content_type"]}" and r.object_id == "{params["object_id"]}")' - - # window_period = group_map.get(time_range, '1h') - # if chart_type in ['line', 'stackedbar']: - # flux_query += f' |> aggregateWindow(every: {window_period}, fn: mean, createEmpty: false)' - - # flux_query += ' |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' - - # if summary: - # flux_query += ' |> last()' - - # flux_query = f'import "timezone"\n\noption location = timezone.location(name: "{timezone}")\n\n{flux_query}' - - # flux_query += ' |> yield(name: "result")' - - # print(f"Generated Flux query: {flux_query}") # Debug print - # return flux_query - + def _fields(self, fields, query, field_name): matches = re.search(self._fields_regex, query) if not matches and not fields: @@ -436,6 +401,60 @@ def _get_top_fields(self, query, params, chart_type, group_map, number, time, ti top_fields = [record["_field"] for table in result for record in table.records] return top_fields + def default_chart_query(self, tags): + q = f''' + from(bucket: "{self.bucket}") + |> range(start: {{time}}) + |> filter(fn: (r) => r._measurement == "{{key}}") + |> filter(fn: (r) => r._field == "{{field_name}}") + ''' + if tags: + q += ''' + |> filter(fn: (r) => r.content_type == "{{content_type}}") + |> filter(fn: (r) => r.object_id == "{{object_id}}") + ''' + if '{{end_date}}' in tags: + q += ' |> range(stop: {{end_date}})' + return q + + def _device_data(self, key, tags, rp, **kwargs): + """ returns last snapshot of ``device_data`` """ + query = f''' + from(bucket: "{self.bucket}") + |> range(start: -30d) + |> filter(fn: (r) => r._measurement == "ping") + |> filter(fn: (r) => r.pk == "{tags['pk']}") + |> last() + |> yield(name: "last") + ''' + print(f"Modified _device_data query: {query}") + return self.get_list_query(query, precision=None) + + def get_list_query(self, query, precision='s', **kwargs): + print(f"get_list_query called with query: {query}") + result = self.query(query) + parsed_result = self._parse_query_result(result) if result else [] + print(f"get_list_query result: {parsed_result}") + return parsed_result + + def get_device_data_structure(self, device_pk): + query = f''' + from(bucket: "{self.bucket}") + |> range(start: -30d) + |> filter(fn: (r) => r._measurement == "ping") + |> filter(fn: (r) => r.pk == "{device_pk}") + |> limit(n: 1) + ''' + print(f"Checking device data structure: {query}") + result = self.query(query) + if result: + for table in result: + for record in table.records: + print(f"Sample record: {record}") + print(f"Available fields: {record.values.keys()}") + else: + print("No data found for this device") + def close(self): self.client.close() diff --git a/openwisp_monitoring/db/backends/influxdb2/queries.py b/openwisp_monitoring/db/backends/influxdb2/queries.py index 057ec6e3..2b62d03c 100644 --- a/openwisp_monitoring/db/backends/influxdb2/queries.py +++ b/openwisp_monitoring/db/backends/influxdb2/queries.py @@ -1,285 +1,3 @@ -# chart_query = { -# 'uptime': { -# 'flux': ( -# 'from(bucket: "mybucket")' -# ' |> range(start: {time}, stop: {end_date})' -# ' |> filter(fn: (r) => r._measurement == "{key}")' -# ' |> filter(fn: (r) => r._field == "{field_name}")' -# ' |> filter(fn: (r) => r.content_type == "{content_type}")' -# ' |> filter(fn: (r) => r.object_id == "{object_id}")' -# ' |> mean()' -# ' |> map(fn: (r) => ({ r with _value: r._value * 100.0 }))' -# ' |> aggregateWindow(every: 1d, fn: mean, createEmpty: false)' -# ' |> yield(name: "uptime")' -# ) -# }, -# 'packet_loss': { -# 'flux': ( -# 'from(bucket: "mybucket")' -# ' |> range(start: {time}, stop: {end_date})' -# ' |> filter(fn: (r) => r._measurement == "{key}")' -# ' |> filter(fn: (r) => r._field == "loss")' -# ' |> filter(fn: (r) => r.content_type == "{content_type}")' -# ' |> filter(fn: (r) => r.object_id == "{object_id}")' -# ' |> aggregateWindow(every: 1d, fn: mean, createEmpty: false)' -# ' |> yield(name: "packet_loss")' -# ) -# }, -# 'rtt': { -# 'flux': ( -# 'from(bucket: "mybucket")' -# ' |> range(start: {time}, stop: {end_date})' -# ' |> filter(fn: (r) => r._measurement == "{key}")' -# ' |> filter(fn: (r) => r._field == "rtt_avg" or r._field == "rtt_max" or r._field == "rtt_min")' -# ' |> filter(fn: (r) => r.content_type == "{content_type}")' -# ' |> filter(fn: (r) => r.object_id == "{object_id}")' -# ' |> aggregateWindow(every: 1d, fn: mean, createEmpty: false)' -# ' |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' -# ' |> yield(name: "rtt")' -# ) -# }, -# 'wifi_clients': { -# 'flux': ( -# 'from(bucket: "mybucket")' -# ' |> range(start: {time}, stop: {end_date})' -# ' |> filter(fn: (r) => r._measurement == "{key}")' -# ' |> filter(fn: (r) => r._field == "{field_name}")' -# ' |> filter(fn: (r) => r.content_type == "{content_type}")' -# ' |> filter(fn: (r) => r.object_id == "{object_id}")' -# ' |> filter(fn: (r) => r.ifname == "{ifname}")' -# ' |> group()' -# ' |> distinct()' -# ' |> count()' -# ' |> set(key: "_field", value: "wifi_clients")' -# ' |> aggregateWindow(every: 1d, fn: max)' -# ) -# }, -# 'general_wifi_clients': { -# 'flux': ( -# 'from(bucket: "mybucket")' -# ' |> range(start: {time}, stop: {end_date})' -# ' |> filter(fn: (r) => r._measurement == "{key}")' -# ' |> filter(fn: (r) => r._field == "{field_name}")' -# ' |> filter(fn: (r) => r.organization_id == "{organization_id}")' -# ' |> filter(fn: (r) => r.location_id == "{location_id}")' -# ' |> filter(fn: (r) => r.floorplan_id == "{floorplan_id}")' -# ' |> group()' -# ' |> distinct()' -# ' |> count()' -# ' |> set(key: "_field", value: "wifi_clients")' -# ' |> aggregateWindow(every: 1d, fn: max)' -# ) -# }, -# 'traffic': { -# 'flux': ( -# 'from(bucket: "mybucket")' -# ' |> range(start: {time}, stop: {end_date})' -# ' |> filter(fn: (r) => r._measurement == "{key}")' -# ' |> filter(fn: (r) => r._field == "tx_bytes" or r._field == "rx_bytes")' -# ' |> filter(fn: (r) => r.content_type == "{content_type}")' -# ' |> filter(fn: (r) => r.object_id == "{object_id}")' -# ' |> filter(fn: (r) => r.ifname == "{ifname}")' -# ' |> sum()' -# ' |> map(fn: (r) => ({ r with _value: r._value / 1000000000.0 }))' -# ' |> aggregateWindow(every: 1d, fn: sum, createEmpty: false)' -# ' |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' -# ' |> rename(columns: {tx_bytes: "upload", rx_bytes: "download"})' -# ' |> yield(name: "traffic")' -# ) -# }, -# 'general_traffic': { -# 'flux': ( -# 'from(bucket: "mybucket")' -# ' |> range(start: {time}, stop: {end_date})' -# ' |> filter(fn: (r) => r._measurement == "{key}")' -# ' |> filter(fn: (r) => r._field == "tx_bytes" or r._field == "rx_bytes")' -# ' |> filter(fn: (r) => r.organization_id == "{organization_id}")' -# ' |> filter(fn: (r) => r.location_id == "{location_id}")' -# ' |> filter(fn: (r) => r.floorplan_id == "{floorplan_id}")' -# ' |> filter(fn: (r) => r.ifname == "{ifname}")' -# ' |> sum()' -# ' |> map(fn: (r) => ({ r with _value: r._value / 1000000000.0 }))' -# ' |> aggregateWindow(every: 1d, fn: sum, createEmpty: false)' -# ' |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' -# ' |> rename(columns: {tx_bytes: "upload", rx_bytes: "download"})' -# ' |> yield(name: "general_traffic")' -# ) -# }, -# 'memory': { -# 'flux': ( -# 'from(bucket: "mybucket")' -# ' |> range(start: {time}, stop: {end_date})' -# ' |> filter(fn: (r) => r._measurement == "{key}")' -# ' |> filter(fn: (r) => r._field == "percent_used")' -# ' |> filter(fn: (r) => r.content_type == "{content_type}")' -# ' |> filter(fn: (r) => r.object_id == "{object_id}")' -# ' |> aggregateWindow(every: 1d, fn: mean, createEmpty: false)' -# ' |> yield(name: "memory_usage")' -# ) -# }, -# 'cpu': { -# 'flux': ( -# 'from(bucket: "mybucket")' -# ' |> range(start: {time}, stop: {end_date})' -# ' |> filter(fn: (r) => r._measurement == "{key}")' -# ' |> filter(fn: (r) => r._field == "cpu_usage")' -# ' |> filter(fn: (r) => r.content_type == "{content_type}")' -# ' |> filter(fn: (r) => r.object_id == "{object_id}")' -# ' |> aggregateWindow(every: 1d, fn: mean, createEmpty: false)' -# ' |> yield(name: "CPU_load")' -# ) -# }, -# 'disk': { -# 'flux': ( -# 'from(bucket: "mybucket")' -# ' |> range(start: {time}, stop: {end_date})' -# ' |> filter(fn: (r) => r._measurement == "{key}")' -# ' |> filter(fn: (r) => r._field == "used_disk")' -# ' |> filter(fn: (r) => r.content_type == "{content_type}")' -# ' |> filter(fn: (r) => r.object_id == "{object_id}")' -# ' |> aggregateWindow(every: 1d, fn: mean, createEmpty: false)' -# ' |> yield(name: "disk_usage")' -# ) -# }, -# 'signal_strength': { -# 'flux': ( -# 'from(bucket: "mybucket")' -# ' |> range(start: {time}, stop: {end_date})' -# ' |> filter(fn: (r) => r._measurement == "{key}")' -# ' |> filter(fn: (r) => r._field == "signal_strength" or r._field == "signal_power")' -# ' |> filter(fn: (r) => r.content_type == "{content_type}")' -# ' |> filter(fn: (r) => r.object_id == "{object_id}")' -# ' |> aggregateWindow(every: 1d, fn: mean, createEmpty: false)' -# ' |> map(fn: (r) => ({ r with _value: float(v: int(v: r._value)) }))' -# ' |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' -# ' |> yield(name: "signal_strength")' -# ) -# }, -# 'signal_quality': { -# 'flux': ( -# 'from(bucket: "mybucket")' -# ' |> range(start: {time}, stop: {end_date})' -# ' |> filter(fn: (r) => r._measurement == "{key}")' -# ' |> filter(fn: (r) => r._field == "signal_quality" or r._field == "snr")' -# ' |> filter(fn: (r) => r.content_type == "{content_type}")' -# ' |> filter(fn: (r) => r.object_id == "{object_id}")' -# ' |> aggregateWindow(every: 1d, fn: mean, createEmpty: false)' -# ' |> map(fn: (r) => ({ r with _value: float(v: int(v: r._value)) }))' -# ' |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' -# ' |> yield(name: "signal_quality")' -# ) -# }, -# 'access_tech': { -# 'flux': ( -# 'from(bucket: "mybucket")' -# ' |> range(start: {time}, stop: {end_date})' -# ' |> filter(fn: (r) => r._measurement == "{key}")' -# ' |> filter(fn: (r) => r._field == "access_tech")' -# ' |> filter(fn: (r) => r.content_type == "{content_type}")' -# ' |> filter(fn: (r) => r.object_id == "{object_id}")' -# ' |> aggregateWindow(every: 1d, fn: (column) => mode(column: "_value"), createEmpty: false)' -# ' |> yield(name: "access_tech")' -# ) -# }, -# 'bandwidth': { -# 'flux': ( -# 'from(bucket: "mybucket")' -# ' |> range(start: {time}, stop: {end_date})' -# ' |> filter(fn: (r) => r._measurement == "{key}")' -# ' |> filter(fn: (r) => r._field == "sent_bps_tcp" or r._field == "sent_bps_udp")' -# ' |> filter(fn: (r) => r.content_type == "{content_type}")' -# ' |> filter(fn: (r) => r.object_id == "{object_id}")' -# ' |> aggregateWindow(every: 1d, fn: mean, createEmpty: false)' -# ' |> map(fn: (r) => ({ r with _value: r._value / 1000000000.0 }))' -# ' |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' -# ' |> rename(columns: {sent_bps_tcp: "TCP", sent_bps_udp: "UDP"})' -# ' |> yield(name: "bandwidth")' -# ) -# }, -# 'transfer': { -# 'flux': ( -# 'from(bucket: "mybucket")' -# ' |> range(start: {time}, stop: {end_date})' -# ' |> filter(fn: (r) => r._measurement == "{key}")' -# ' |> filter(fn: (r) => r._field == "sent_bytes_tcp" or r._field == "sent_bytes_udp")' -# ' |> filter(fn: (r) => r.content_type == "{content_type}")' -# ' |> filter(fn: (r) => r.object_id == "{object_id}")' -# ' |> sum()' -# ' |> map(fn: (r) => ({ r with _value: r._value / 1000000000.0 }))' -# ' |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' -# ' |> rename(columns: {sent_bytes_tcp: "TCP", sent_bytes_udp: "UDP"})' -# ' |> yield(name: "transfer")' -# ) -# }, -# 'retransmits': { -# 'flux': ( -# 'from(bucket: "mybucket")' -# ' |> range(start: {time}, stop: {end_date})' -# ' |> filter(fn: (r) => r._measurement == "{key}")' -# ' |> filter(fn: (r) => r._field == "retransmits")' -# ' |> filter(fn: (r) => r.content_type == "{content_type}")' -# ' |> filter(fn: (r) => r.object_id == "{object_id}")' -# ' |> aggregateWindow(every: 1d, fn: mean, createEmpty: false)' -# ' |> yield(name: "retransmits")' -# ) -# }, -# 'jitter': { -# 'flux': ( -# 'from(bucket: "mybucket")' -# ' |> range(start: {time}, stop: {end_date})' -# ' |> filter(fn: (r) => r._measurement == "{key}")' -# ' |> filter(fn: (r) => r._field == "jitter")' -# ' |> filter(fn: (r) => r.content_type == "{content_type}")' -# ' |> filter(fn: (r) => r.object_id == "{object_id}")' -# ' |> aggregateWindow(every: 1d, fn: mean, createEmpty: false)' -# ' |> yield(name: "jitter")' -# ) -# }, -# 'datagram': { -# 'flux': ( -# 'from(bucket: "mybucket")' -# ' |> range(start: {time}, stop: {end_date})' -# ' |> filter(fn: (r) => r._measurement == "{key}")' -# ' |> filter(fn: (r) => r._field == "lost_packets" or r._field == "total_packets")' -# ' |> filter(fn: (r) => r.content_type == "{content_type}")' -# ' |> filter(fn: (r) => r.object_id == "{object_id}")' -# ' |> aggregateWindow(every: 1d, fn: mean, createEmpty: false)' -# ' |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' -# ' |> rename(columns: {lost_packets: "lost_datagram", total_packets: "total_datagram"})' -# ' |> yield(name: "datagram")' -# ) -# }, -# 'datagram_loss': { -# 'flux': ( -# 'from(bucket: "mybucket")' -# ' |> range(start: {time}, stop: {end_date})' -# ' |> filter(fn: (r) => r._measurement == "{key}")' -# ' |> filter(fn: (r) => r._field == "lost_percent")' -# ' |> filter(fn: (r) => r.content_type == "{content_type}")' -# ' |> filter(fn: (r) => r.object_id == "{object_id}")' -# ' |> aggregateWindow(every: 1d, fn: mean, createEmpty: false)' -# ' |> yield(name: "datagram_loss")' -# ) -# } -# } - -# default_chart_query = ( -# 'from(bucket: "mybucket")' -# ' |> range(start: {time}, stop: {end_date})' -# ' |> filter(fn: (r) => r._measurement == "{key}")' -# ' |> filter(fn: (r) => r._field == "{field_name}")' -# ' |> filter(fn: (r) => r.content_type == "{content_type}")' -# ' |> filter(fn: (r) => r.object_id == "{object_id}")' -# ) - -# device_data_query = ( -# 'from(bucket: "mybucket")' -# ' |> range(start: -30d)' -# ' |> filter(fn: (r) => r._measurement == "{0}")' -# ' |> filter(fn: (r) => r.pk == "{1}")' -# ' |> last()' -# ) - chart_query = { 'uptime': { 'influxdb2': ( @@ -544,21 +262,3 @@ ) } } - -default_chart_query = ( - 'from(bucket: "mybucket")' - ' |> range(start: {time}, stop: {end_date})' - ' |> filter(fn: (r) => r._measurement == "{measurement}")' - ' |> filter(fn: (r) => r._field == "{field_name}")' - ' |> filter(fn: (r) => r.content_type == "{content_type}")' - ' |> filter(fn: (r) => r.object_id == "{object_id}")' -) - -device_data_query = ( - 'from(bucket: "{bucket}")' - ' |> range(start: -30d)' - ' |> filter(fn: (r) => r._measurement == "{measurement}")' - ' |> filter(fn: (r) => r.object_id == "{object_id}")' - ' |> last()' - ' |> yield(name: "last")' -) diff --git a/openwisp_monitoring/device/base/models.py b/openwisp_monitoring/device/base/models.py index cdb9ba8e..52e45e74 100644 --- a/openwisp_monitoring/device/base/models.py +++ b/openwisp_monitoring/device/base/models.py @@ -384,7 +384,6 @@ def update_status(self, value): if self.status == '' and app_settings.AUTO_CLEAR_MANAGEMENT_IP: self.device.management_ip = None self.device.save(update_fields=['management_ip']) - health_status_changed.send(sender=self.__class__, instance=self, status=value) @property diff --git a/openwisp_monitoring/monitoring/base/models.py b/openwisp_monitoring/monitoring/base/models.py index f3cd7141..247815df 100644 --- a/openwisp_monitoring/monitoring/base/models.py +++ b/openwisp_monitoring/monitoring/base/models.py @@ -4,6 +4,9 @@ from copy import deepcopy from datetime import date, datetime, timedelta +from dateutil.parser import parse +from django.utils.timezone import make_aware, is_aware + from cache_memoize import cache_memoize from dateutil.parser import parse as parse_date from django.conf import settings @@ -672,20 +675,25 @@ def get_query( additional_params = additional_params or {} params = self._get_query_params(time, start_date, end_date) params.update(additional_params) - params.update({'start_date': start_date, 'end_date': end_date}) + params.update({ + 'start_date': start_date, + 'end_date': end_date, + 'measurement': self.config_dict.get('measurement', self.metric.key), + 'field_name': fields or self.config_dict.get('field_name'), + }) if not params.get('organization_id') and self.config_dict.get('__all__', False): params['organization_id'] = ['__all__'] return timeseries_db.get_query( - self.type, - params, - time, - self._get_group_map(time), - summary, - fields, - query, - timezone, + chart_type=self.type, + params=params, + time=time, + group_map=self._get_group_map(time), + summary=summary, + fields=fields, + query=query, + timezone=timezone, ) - + def get_top_fields(self, number): """ Returns list of top ``number`` of fields (highest sum) of a @@ -752,8 +760,9 @@ def read( ): additional_query_kwargs = additional_query_kwargs or {} traces = {} - if x_axys: - x = [] + x = [] + result = {'traces': [], 'summary': {}} # Initialize result dictionary + try: query_kwargs = dict( time=time, timezone=timezone, start_date=start_date, end_date=end_date @@ -778,42 +787,81 @@ def read( summary_query, key=self.metric.key ) except timeseries_db.client_error as e: - logging.error(e, exc_info=True) - raise e + logger.error(f"Error fetching data: {e}", exc_info=True) + raise + for point in points: time_value = point.get('time') or point.get('_time') if not time_value: - logging.warning(f"Point missing time value: {point}") + logger.warning(f"Point missing time value: {point}") + continue + + try: + formatted_time = self._parse_and_format_time(time_value, timezone) + except ValueError as e: + logger.warning(f"Error parsing time value: {time_value}. Error: {e}") continue + for key, value in point.items(): - if key == 'time': + if key in ('time', '_time', 'result', 'table', 'content_type', 'object_id'): continue traces.setdefault(key, []) - if decimal_places and isinstance(value, (int, float)): + if decimal_places is not None and value is not None: value = self._round(value, decimal_places) traces[key].append(value) - time = datetime.fromtimestamp(point['time'], tz=tz(timezone)).strftime( - '%Y-%m-%d %H:%M' - ) + if x_axys: - x.append(time) - # prepare result to be returned - # (transform chart data so its order is not random) - result = {'traces': sorted(traces.items())} + x.append(formatted_time) + + # Prepare result + result['traces'] = sorted(traces.items()) if x_axys: result['x'] = x - # add summary - if len(summary) > 0: - result['summary'] = {} + + # Handle summary calculation + if summary: for key, value in summary[0].items(): - if key == 'time': + if key in ('time', '_time', 'result', 'table', 'content_type', 'object_id'): continue if not timeseries_db.validate_query(self.query): value = None - elif value: + elif value is not None: value = self._round(value, decimal_places) - result['summary'][key] = value + result['summary'][key] = 'N/A' if value is None else value + return result + + def _round(self, value, decimal_places): + logger.debug(f"Rounding value: {value}, type: {type(value)}") + if value is None: + logger.debug("Value is None, returning None") + return None + try: + float_value = float(value) + rounded = round(float_value, decimal_places) + logger.debug(f"Rounded value: {rounded}") + return rounded + except (ValueError, TypeError) as e: + logger.warning(f"Could not round value: {value}. Error: {e}") + return value + + def _parse_and_format_time(self, time_str, timezone): + time_obj = parse(time_str) + if not is_aware(time_obj): + time_obj = make_aware(time_obj, timezone=tz(timezone)) + return time_obj.strftime('%Y-%m-%d %H:%M') + + def _safe_round(self, value, decimal_places): + if isinstance(value, (int, float)): + return self._round(value, decimal_places) + return value + + def _round(self, value, decimal_places): + try: + control = 10 ** decimal_places + return round(float(value) * control) / control + except (ValueError, TypeError): + return value def json(self, time=DEFAULT_TIME, **kwargs): try: diff --git a/openwisp_monitoring/monitoring/migrations/__init__.py b/openwisp_monitoring/monitoring/migrations/__init__.py index d8198ede..d7d3f6fc 100644 --- a/openwisp_monitoring/monitoring/migrations/__init__.py +++ b/openwisp_monitoring/monitoring/migrations/__init__.py @@ -1,3 +1,4 @@ +from asyncio.log import logger import swapper from django.contrib.auth.models import Permission diff --git a/openwisp_monitoring/views.py b/openwisp_monitoring/views.py index 840042a5..21ab811e 100644 --- a/openwisp_monitoring/views.py +++ b/openwisp_monitoring/views.py @@ -135,6 +135,11 @@ def _get_charts_data(self, charts, time, timezone, start_date, end_date): chart_dict['connect_points'] = chart.connect_points if chart.trace_labels: chart_dict['trace_labels'] = chart.trace_labels + # Handle None values in summary + if 'summary' in chart_dict: + for key, value in chart_dict['summary'].items(): + if value is None: + chart_dict['summary'][key] = 'N/A' except InvalidChartConfigException: logger.exception(f'Skipped chart for metric {chart.metric}') continue diff --git a/tests/openwisp2/settings.py b/tests/openwisp2/settings.py index bbe66916..bd27777d 100644 --- a/tests/openwisp2/settings.py +++ b/tests/openwisp2/settings.py @@ -7,8 +7,6 @@ TESTING = 'test' in sys.argv SHELL = 'shell' in sys.argv or 'shell_plus' in sys.argv BASE_DIR = os.path.dirname(os.path.abspath(__file__)) -INFLUXDB_BACKEND = 'openwisp_monitoring.db.backends.influxdb2' -INFLUXDB_BACKEND = 'openwisp_monitoring.db.backends.influxdb' DEBUG = True ALLOWED_HOSTS = ['*']