From a2194474a635a0cba42b48d59e1e349f60a26103 Mon Sep 17 00:00:00 2001 From: Prapti Sharma Date: Sat, 8 Jun 2024 14:53:05 +0530 Subject: [PATCH] [fix] Corrections in docker-compose #274 Fixes #274 --- Dockerfile | 22 +- docker-compose.yml | 32 ++- openwisp_monitoring/db/backends/base.py | 42 ---- .../db/backends/influxdb2/__init__.py | 0 .../db/backends/influxdb2/client.py | 79 ++----- .../db/backends/influxdb2/query_data.py | 197 ++++++++++++++++++ .../db/backends/influxdb2/write_data.py | 25 +++ tests/__init__.py | 0 tests/docker-entrypoint.sh | 19 +- 9 files changed, 264 insertions(+), 152 deletions(-) delete mode 100644 openwisp_monitoring/db/backends/base.py create mode 100644 openwisp_monitoring/db/backends/influxdb2/__init__.py create mode 100644 openwisp_monitoring/db/backends/influxdb2/query_data.py create mode 100644 openwisp_monitoring/db/backends/influxdb2/write_data.py create mode 100644 tests/__init__.py diff --git a/Dockerfile b/Dockerfile index 0c3808f60..8cf5a1a1e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,29 +14,17 @@ RUN pip install -U pip setuptools wheel COPY requirements-test.txt requirements.txt /opt/openwisp/ RUN pip install -r /opt/openwisp/requirements.txt && \ pip install -r /opt/openwisp/requirements-test.txt && \ - rm -rf /root/.cache/pip/* /tmp/* + rm -rf /var/lib/apt/lists/* /root/.cache/pip/* /tmp/* -# Copy project files and install the project -COPY . /opt/openwisp +ADD . /opt/openwisp RUN pip install -U /opt/openwisp && \ - rm -rf /root/.cache/pip/* /tmp/* - -# Copy entrypoint script -COPY docker-entrypoint.sh /opt/openwisp/docker-entrypoint.sh -RUN chmod +x /opt/openwisp/docker-entrypoint.sh - -# Set working directory + rm -rf /var/lib/apt/lists/* /root/.cache/pip/* /tmp/* WORKDIR /opt/openwisp/tests/ - # Set environment variables ENV NAME=openwisp-monitoring \ PYTHONBUFFERED=1 \ - INFLUXDB1_HOST=influxdb \ - INFLUXDB2_HOST=influxdb2 \ + INFLUXDB_HOST=influxdb \ REDIS_HOST=redis - -# Expose the application port +CMD ["sh", "docker-entrypoint.sh"] EXPOSE 8000 - # Command to run the application -ENTRYPOINT ["/opt/openwisp/docker-entrypoint.sh"] diff --git a/docker-compose.yml b/docker-compose.yml index f3b9aed04..3a8a3f39f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,7 +10,6 @@ services: - "8000:8000" depends_on: - influxdb - - influxdb2 - redis influxdb: @@ -30,32 +29,27 @@ services: INFLUXDB_USER_PASSWORD: openwisp influxdb2: - image: influxdb:2.0-alpine - volumes: - - influxdb2-data:/var/lib/influxdb2 + image: influxdb:2.0 + container_name: influxdb2 ports: - - "9999:9999" + # Map the 9086 port on host machine to 8086 in container + - "9086:8086" environment: DOCKER_INFLUXDB_INIT_MODE: setup - DOCKER_INFLUXDB_INIT_USERNAME: openwisp - DOCKER_INFLUXDB_INIT_PASSWORD: openwisp - DOCKER_INFLUXDB_INIT_ORG: openwisp - DOCKER_INFLUXDB_INIT_BUCKET: openwisp2 + DOCKER_INFLUXDB_INIT_USERNAME: myuser + DOCKER_INFLUXDB_INIT_PASSWORD: mypassword + DOCKER_INFLUXDB_INIT_ORG: myorg + DOCKER_INFLUXDB_INIT_BUCKET: mybucket DOCKER_INFLUXDB_INIT_RETENTION: 1w - DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: my-super-secret-auth-token - INFLUXD_LOG_LEVEL: debug - healthcheck: - test: ["CMD", "curl", "-f", "http://localhost:9999/health"] - interval: 30s - timeout: 10s - retries: 5 + volumes: + - influxdb-storage:/var/lib/influxdb2 redis: image: redis:5.0-alpine ports: - "6379:6379" - entrypoint: ["redis-server", "--appendonly", "yes"] + entrypoint: redis-server --appendonly yes volumes: - influxdb-data: - influxdb2-data: + influxdb-data: {} + influxdb-storage: diff --git a/openwisp_monitoring/db/backends/base.py b/openwisp_monitoring/db/backends/base.py deleted file mode 100644 index da23e5282..000000000 --- a/openwisp_monitoring/db/backends/base.py +++ /dev/null @@ -1,42 +0,0 @@ -import logging - -from django.utils.functional import cached_property - -from openwisp_monitoring.utils import retry - -logger = logging.getLogger(__name__) - - -class BaseDatabaseClient: - def __init__(self, db_name=None): - self._db = None - self.db_name = db_name - - @cached_property - def db(self): - raise NotImplementedError("Subclasses must implement `db` method") - - @retry - def create_database(self): - raise NotImplementedError("Subclasses must implement `create_database` method") - - @retry - def drop_database(self): - raise NotImplementedError("Subclasses must implement `drop_database` method") - - @retry - def query(self, query): - raise NotImplementedError("Subclasses must implement `query` method") - - def write(self, name, values, **kwargs): - raise NotImplementedError("Subclasses must implement `write` method") - - def get_list_retention_policies(self, name=None): - raise NotImplementedError( - "Subclasses must implement `get_list_retention_policies` method" - ) - - def create_or_alter_retention_policy(self, name, duration): - raise NotImplementedError( - "Subclasses must implement `create_or_alter_retention_policy` method" - ) diff --git a/openwisp_monitoring/db/backends/influxdb2/__init__.py b/openwisp_monitoring/db/backends/influxdb2/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/openwisp_monitoring/db/backends/influxdb2/client.py b/openwisp_monitoring/db/backends/influxdb2/client.py index 72e142534..1c8321ee9 100644 --- a/openwisp_monitoring/db/backends/influxdb2/client.py +++ b/openwisp_monitoring/db/backends/influxdb2/client.py @@ -1,78 +1,45 @@ import logging +import influxdb_client +from django.conf import settings from django.utils.functional import cached_property -from influxdb_client import InfluxDBClient, Point -from influxdb_client.client.exceptions import InfluxDBError from influxdb_client.client.write_api import SYNCHRONOUS from openwisp_monitoring.utils import retry -from ...exceptions import TimeseriesWriteException -from .. import TIMESERIES_DB -from ..base import BaseDatabaseClient - logger = logging.getLogger(__name__) -class DatabaseClient(BaseDatabaseClient): +class DatabaseClient: backend_name = 'influxdb2' - def __init__(self, db_name=None): - super().__init__(db_name) - self.client_error = InfluxDBError + def __init__(self): + self.token = settings.TIMESERIES_DB['TOKEN'] + self.org = settings.TIMESERIES_DB['ORG'] + self.bucket = settings.TIMESERIES_DB['BUCKET'] + self.url = ( + f"http://{settings.TIMESERIES_DB['HOST']}:{settings.TIMESERIES_DB['PORT']}" + ) @cached_property - def db(self): - return InfluxDBClient( - url=f"http://{TIMESERIES_DB['HOST']}:{TIMESERIES_DB['PORT']}", - token=TIMESERIES_DB['TOKEN'], - org=TIMESERIES_DB['ORG'], - bucket=self.db_name, + def client(self): + return influxdb_client.InfluxDBClient( + url=self.url, token=self.token, org=self.org ) - @retry - def create_database(self): - self.write_api = self.db.write_api(write_options=SYNCHRONOUS) - self.query_api = self.db.query_api() - logger.debug('Initialized APIs for InfluxDB 2.0') - - @retry - def drop_database(self): - pass # Implement as needed for InfluxDB 2.0 + @cached_property + def write_api(self): + return self.client.write_api(write_options=SYNCHRONOUS) @retry - def query(self, query): - return self.query_api.query(query) - def write(self, name, values, **kwargs): - point = Point(name).time(self._get_timestamp(kwargs.get('timestamp'))) - tags = kwargs.get('tags', {}) - for tag, value in tags.items(): - point.tag(tag, value) - for field, value in values.items(): - point.field(field, value) - try: - self.write_api.write(bucket=self.db_name, record=point) - except InfluxDBError as e: - raise TimeseriesWriteException(str(e)) + point = influxdb_client.Point(name).fields(values) + self.write_api.write(bucket=self.bucket, org=self.org, record=point) - @retry - def get_list_retention_policies(self, name=None): - bucket = self.db.buckets_api().find_bucket_by_name(name) - if bucket: - return bucket.retention_rules - return [] + @cached_property + def query_api(self): + return self.client.query_api() @retry - def create_or_alter_retention_policy(self, name, duration): - bucket = self.db.buckets_api().find_bucket_by_name(name) - retention_rules = [{"type": "expire", "everySeconds": duration}] - if bucket: - bucket.retention_rules = retention_rules - self.db.buckets_api().update_bucket(bucket=bucket) - else: - self.db.buckets_api().create_bucket( - bucket_name=name, - retention_rules=retention_rules, - org=TIMESERIES_DB["ORG"], - ) + def query(self, query): + return self.query_api.query(org=self.org, query=query) diff --git a/openwisp_monitoring/db/backends/influxdb2/query_data.py b/openwisp_monitoring/db/backends/influxdb2/query_data.py new file mode 100644 index 000000000..e0f53cb07 --- /dev/null +++ b/openwisp_monitoring/db/backends/influxdb2/query_data.py @@ -0,0 +1,197 @@ +import influxdb_client + +# Configuration for your InfluxDB client +bucket = "mybucket" +org = "myorg" +token = "t8Q3Y5mTWuqqTRdGyVxZuyVLO-8pl3I8KaNTR3jV7uTDr_GVECP5Z7LsrZwILGw79Xp4O8pAWkdqTREgIk073Q==" +url = "http://localhost:9086" + +client = influxdb_client.InfluxDBClient(url=url, token=token, org=org) + +query_api = client.query_api() + + +def generate_flux_query( + query_name, start_time, end_time, object_id, field_name, content_type, ifname +): + base_query = f''' +from(bucket: "{bucket}") + |> range(start: time(v: "{start_time}"), stop: time(v: "{end_time}")) + |> filter(fn: (r) => r._measurement == "{content_type}" and r.object_id == "{object_id}") + ''' + query_details = { + 'uptime': ''' + |> mean() + |> group(columns: ["_time"]) + |> yield(name: "uptime") + ''', + 'packet_loss': ''' + |> mean() + |> group(columns: ["_time"]) + |> yield(name: "packet_loss") + ''', + 'rtt': ''' + |> mean() + |> group(columns: ["_time"]) + |> yield(name: "rtt") +''', + 'wifi_clients': f''' + |> filter(fn: (r) => r.ifname == "{ifname}") + |> group(columns: ["{field_name}"]) + |> count(column: "{field_name}") + |> group(columns: ["_time"]) + |> yield(name: "wifi_clients") +''', + 'general_wifi_clients': f''' + |> filter(fn: (r) => true) // Add any specific filters for organization_id, location_id, etc., if applicable. + |> group(columns: ["{field_name}"]) + |> count(column: "{field_name}") + |> group(columns: ["_time"]) + |> yield(name: "general_wifi_clients") +''', + 'traffic': f''' + |> filter(fn: (r) => r.ifname == "{ifname}") + |> sum(column: "tx_bytes") + |> map(fn: (r) => ({{_time: r._time, upload: r._value / 1000000000}})) + |> sum(column: "rx_bytes") + |> map(fn: (r) => ({{_time: r._time, download: r._value / 1000000000}})) + |> group(columns: ["_time"]) + |> yield(name: "traffic") +''', + 'general_traffic': ''' + |> sum(column: "tx_bytes") + |> map(fn: (r) => ({{_time: r._time, upload: r._value / 1000000000}})) + |> sum(column: "rx_bytes") + |> map(fn: (r) => ({{_time: r._time, download: r._value / 1000000000}})) + |> group(columns: ["_time"]) + |> yield(name: "general_traffic") +''', + 'memory': ''' + |> mean(column: "percent_used") + |> group(columns: ["_time"]) + |> yield(name: "memory") +''', + 'cpu': ''' + |> mean(column: "cpu_usage") + |> group(columns: ["_time"]) + |> yield(name: "cpu") +''', + 'disk': ''' + |> mean(column: "used_disk") + |> group(columns: ["_time"]) + |> yield(name: "disk") +''', + 'signal_strength': ''' + |> mean(columns: ["signal_strength", "signal_power"]) + |> map(fn: (r) => ({{ signal_strength: round(r.signal_strength), signal_power: round(r.signal_power) }})) + |> group(columns: ["_time"]) + |> yield(name: "signal_strength") +''', + 'signal_quality': ''' + |> mean(columns: ["signal_quality", "snr"]) + |> map(fn: (r) => ({{ signal_quality: round(r.signal_quality), snr: round(r.snr) }})) + |> group(columns: ["_time"]) + |> yield(name: "signal_quality") +''', + 'access_tech': ''' + |> mode(column: "access_tech") + |> group(columns: ["_time"]) + |> yield(name: "access_tech") +''', + 'bandwidth': ''' + |> mean(columns: ["sent_bps_tcp", "sent_bps_udp"]) + |> map(fn: (r) => ({{ tcp: r.sent_bps_tcp / 1000000000, udp: r.sent_bps_udp / 1000000000}})) + |> group(columns: ["_time"]) + |> yield(name: "bandwidth") +''', + 'transfer': ''' + |> sum(columns: ["sent_bytes_tcp", "sent_bytes_udp"]) + |> map(fn: (r) => ({{ tcp: r.sent_bytes_tcp / 1000000000, udp: r.sent_bytes_udp / 1000000000 }})) + |> group(columns: ["_time"]) + |> yield(name: "transfer") +''', + 'retransmits': ''' + |> mean(column: "retransmits") + |> group(columns: ["_time"]) + |> yield(name: "retransmits") +''', + 'jitter': ''' + |> mean(column: "jitter") + |> group(columns: ["_time"]) + |> yield(name: "jitter") +''', + 'datagram': ''' + |> mean(columns: ["lost_packets", "total_packets"]) + |> group(columns: ["_time"]) + |> yield(name: "datagram") +''', + 'datagram_loss': ''' + |> mean(column: "lost_percent") + |> group(columns: ["_time"]) + |> yield(name: "datagram_loss") +''', + } + + return base_query + query_details.get( + query_name, '// No query found for the given name' + ) + + +def execute_query(flux_query): + print( + f"Executing Query: {flux_query[:50]}..." + ) # Log the query start (only part of it for readability) + result = query_api.query(org=org, query=flux_query) + results = [] + for table in result: + for record in table.records: + results.append((record.get_field(), record.get_value())) + return results + + +def main(): + start_time = "2023-01-01T00:00:00Z" + end_time = "2023-01-07T00:00:00Z" + object_id = "12345" + field_name = "temperature" + content_type = "environment" + ifname = "eth0" + + query_names = [ + 'uptime', + 'packet_loss', + 'rtt', + 'wifi_clients', + 'general_wifi_clients', + 'traffic', + 'general_traffic', + 'memory', + 'cpu', + 'disk', + 'signal_strength', + 'signal_quality', + 'access_tech', + 'bandwidth', + 'transfer', + 'retransmits', + 'jitter', + 'datagram', + 'datagram_loss', + ] + + for query_name in query_names: + flux_query = generate_flux_query( + query_name, + start_time, + end_time, + object_id, + field_name, + content_type, + ifname, + ) + results = execute_query(flux_query) + print(f"Results for {query_name} query:", results) + + +if __name__ == "__main__": + main() diff --git a/openwisp_monitoring/db/backends/influxdb2/write_data.py b/openwisp_monitoring/db/backends/influxdb2/write_data.py new file mode 100644 index 000000000..2bd5c29cf --- /dev/null +++ b/openwisp_monitoring/db/backends/influxdb2/write_data.py @@ -0,0 +1,25 @@ +import influxdb_client +from influxdb_client.client.write_api import SYNCHRONOUS + +bucket = "mybucket" +org = "myorg" +token = "t8Q3Y5mTWuqqTRdGyVxZuyVLO-8pl3I8KaNTR3jV7uTDr_GVECP5Z7LsrZwILGw79Xp4O8pAWkdqTREgIk073Q==" +url = "http://localhost:9086" + +client = influxdb_client.InfluxDBClient(url=url, token=token, org=org) + +write_api = client.write_api(write_options=SYNCHRONOUS) + +p = ( + influxdb_client.Point("my_measurement") + .tag("location", "Prague") + .field("temperature", 25.3) +) + +try: + write_api.write(bucket=bucket, org=org, record=p) + print("Data written successfully.") +except Exception as e: + print(f"Failed to write data: {e}") + +client.close() diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/docker-entrypoint.sh b/tests/docker-entrypoint.sh index ffb8aacf0..cbd50bf70 100755 --- a/tests/docker-entrypoint.sh +++ b/tests/docker-entrypoint.sh @@ -1,16 +1,12 @@ #!/bin/bash -set -e - create_superuser() { local username="$1" local email="$2" local password="$3" cat <