-
-
Notifications
You must be signed in to change notification settings - Fork 120
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[fix] Corrections in docker-compose #274
Fixes #274
- Loading branch information
1 parent
4a6c3aa
commit a219447
Showing
9 changed files
with
264 additions
and
152 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,78 +1,45 @@ | ||
import logging | ||
|
||
import influxdb_client | ||
from django.conf import settings | ||
from django.utils.functional import cached_property | ||
from influxdb_client import InfluxDBClient, Point | ||
from influxdb_client.client.exceptions import InfluxDBError | ||
from influxdb_client.client.write_api import SYNCHRONOUS | ||
|
||
from openwisp_monitoring.utils import retry | ||
|
||
from ...exceptions import TimeseriesWriteException | ||
from .. import TIMESERIES_DB | ||
from ..base import BaseDatabaseClient | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class DatabaseClient(BaseDatabaseClient): | ||
class DatabaseClient: | ||
backend_name = 'influxdb2' | ||
|
||
def __init__(self, db_name=None): | ||
super().__init__(db_name) | ||
self.client_error = InfluxDBError | ||
def __init__(self): | ||
self.token = settings.TIMESERIES_DB['TOKEN'] | ||
self.org = settings.TIMESERIES_DB['ORG'] | ||
self.bucket = settings.TIMESERIES_DB['BUCKET'] | ||
self.url = ( | ||
f"http://{settings.TIMESERIES_DB['HOST']}:{settings.TIMESERIES_DB['PORT']}" | ||
) | ||
|
||
@cached_property | ||
def db(self): | ||
return InfluxDBClient( | ||
url=f"http://{TIMESERIES_DB['HOST']}:{TIMESERIES_DB['PORT']}", | ||
token=TIMESERIES_DB['TOKEN'], | ||
org=TIMESERIES_DB['ORG'], | ||
bucket=self.db_name, | ||
def client(self): | ||
return influxdb_client.InfluxDBClient( | ||
url=self.url, token=self.token, org=self.org | ||
) | ||
|
||
@retry | ||
def create_database(self): | ||
self.write_api = self.db.write_api(write_options=SYNCHRONOUS) | ||
self.query_api = self.db.query_api() | ||
logger.debug('Initialized APIs for InfluxDB 2.0') | ||
|
||
@retry | ||
def drop_database(self): | ||
pass # Implement as needed for InfluxDB 2.0 | ||
@cached_property | ||
def write_api(self): | ||
return self.client.write_api(write_options=SYNCHRONOUS) | ||
|
||
@retry | ||
def query(self, query): | ||
return self.query_api.query(query) | ||
|
||
def write(self, name, values, **kwargs): | ||
point = Point(name).time(self._get_timestamp(kwargs.get('timestamp'))) | ||
tags = kwargs.get('tags', {}) | ||
for tag, value in tags.items(): | ||
point.tag(tag, value) | ||
for field, value in values.items(): | ||
point.field(field, value) | ||
try: | ||
self.write_api.write(bucket=self.db_name, record=point) | ||
except InfluxDBError as e: | ||
raise TimeseriesWriteException(str(e)) | ||
point = influxdb_client.Point(name).fields(values) | ||
self.write_api.write(bucket=self.bucket, org=self.org, record=point) | ||
|
||
@retry | ||
def get_list_retention_policies(self, name=None): | ||
bucket = self.db.buckets_api().find_bucket_by_name(name) | ||
if bucket: | ||
return bucket.retention_rules | ||
return [] | ||
@cached_property | ||
def query_api(self): | ||
return self.client.query_api() | ||
|
||
@retry | ||
def create_or_alter_retention_policy(self, name, duration): | ||
bucket = self.db.buckets_api().find_bucket_by_name(name) | ||
retention_rules = [{"type": "expire", "everySeconds": duration}] | ||
if bucket: | ||
bucket.retention_rules = retention_rules | ||
self.db.buckets_api().update_bucket(bucket=bucket) | ||
else: | ||
self.db.buckets_api().create_bucket( | ||
bucket_name=name, | ||
retention_rules=retention_rules, | ||
org=TIMESERIES_DB["ORG"], | ||
) | ||
def query(self, query): | ||
return self.query_api.query(org=self.org, query=query) |
Oops, something went wrong.