-
-
Notifications
You must be signed in to change notification settings - Fork 138
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Support InfluxDB 2.x as addition to 1.8
- Loading branch information
1 parent
cfc5c69
commit cf5f93d
Showing
6 changed files
with
68 additions
and
20 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,45 +1,88 @@ | ||
import re | ||
from sys import exit | ||
from logging import getLogger | ||
from influxdb import InfluxDBClient | ||
from requests.exceptions import ConnectionError | ||
from influxdb.exceptions import InfluxDBServerError | ||
from influxdb_client import InfluxDBClient, BucketRetentionRules | ||
from influxdb_client.client.write_api import SYNCHRONOUS | ||
from influxdb_client.client.exceptions import InfluxDBError | ||
from urllib3.exceptions import NewConnectionError | ||
|
||
|
||
class DBManager(object): | ||
def __init__(self, server): | ||
self.server = server | ||
self.logger = getLogger() | ||
self.bucket = "varken" | ||
|
||
if self.server.url == "influxdb.domain.tld": | ||
self.logger.critical("You have not configured your varken.ini. Please read Wiki page for configuration") | ||
exit() | ||
self.influx = InfluxDBClient(host=self.server.url, port=self.server.port, username=self.server.username, | ||
password=self.server.password, ssl=self.server.ssl, database='varken', | ||
verify_ssl=self.server.verify_ssl) | ||
|
||
url = self.server.url | ||
if 'http' not in url: | ||
scheme = 'http' | ||
if self.server.ssl: | ||
scheme = 'https' | ||
url = "{}://{}d:{}".format(scheme, self.server.url, self.server.port) | ||
|
||
token = f'{self.server.username}:password={self.server.password}' | ||
|
||
self.influx = InfluxDBClient(url=url, token=token, | ||
verify_ssl=self.server.verify_ssl, org=self.server.org) | ||
|
||
try: | ||
version = self.influx.request('ping', expected_response_code=204).headers['X-Influxdb-Version'] | ||
version = self.influx.version() | ||
self.logger.info('Influxdb version: %s', version) | ||
except ConnectionError: | ||
self.logger.critical("Error testing connection to InfluxDB. Please check your url/hostname") | ||
match = re.match(r'(\d+)\.', version) | ||
if match: | ||
self.version = int(match[1]) | ||
self.logger.info("Using InfluxDB API v{}", self.version) | ||
else: | ||
self.logger.critical("Unknown influxdb version") | ||
exit(1) | ||
except NewConnectionError: | ||
self.logger.critical("Error getting InfluxDB version number. Please check your url/hostname are valid") | ||
exit(1) | ||
|
||
databases = [db['name'] for db in self.influx.get_list_database()] | ||
if self.version >= 2: | ||
self.create_v2_bucket() | ||
else: | ||
self.create_v1_database() | ||
|
||
if 'varken' not in databases: | ||
def create_v2_bucket(self): | ||
if not self.influx.buckets_api().find_bucket_by_name(self.bucket): | ||
self.logger.info("Creating varken bucket") | ||
|
||
retention = BucketRetentionRules(type="expire", every_seconds=60 * 60 * 24 * 30, | ||
shard_group_duration_seconds=60 * 60) | ||
self.influx.buckets_api().create_bucket(bucket_name=self.bucket, | ||
retention_rules=retention) | ||
|
||
def create_v1_database(self): | ||
from influxdb import InfluxDBClient | ||
client = InfluxDBClient(host=self.server.url, port=self.server.port, username=self.server.username, | ||
password=self.server.password, ssl=self.server.ssl, database=self.bucket, | ||
verify_ssl=self.server.verify_ssl) | ||
databases = [db['name'] for db in client.get_list_database()] | ||
|
||
if self.bucket not in databases: | ||
self.logger.info("Creating varken database") | ||
self.influx.create_database('varken') | ||
client.create_database(self.bucket) | ||
|
||
retention_policies = [policy['name'] for policy in | ||
self.influx.get_list_retention_policies(database='varken')] | ||
client.get_list_retention_policies(database=self.bucket)] | ||
if 'varken 30d-1h' not in retention_policies: | ||
self.logger.info("Creating varken retention policy (30d-1h)") | ||
self.influx.create_retention_policy(name='varken 30d-1h', duration='30d', replication='1', | ||
database='varken', default=True, shard_duration='1h') | ||
client.create_retention_policy(name='varken 30d-1h', duration='30d', replication='1', | ||
database=self.bucket, default=True, shard_duration='1h') | ||
|
||
self.bucket = f'{self.bucket}/varken 30d-1h' | ||
|
||
def write_points(self, data): | ||
d = data | ||
self.logger.debug('Writing Data to InfluxDB %s', d) | ||
write_api = self.influx.write_api(write_options=SYNCHRONOUS) | ||
try: | ||
self.influx.write_points(d) | ||
except (InfluxDBServerError, ConnectionError) as e: | ||
write_api.write(bucket=self.bucket, record=data) | ||
except (InfluxDBError, NewConnectionError) as e: | ||
self.logger.error('Error writing data to influxdb. Dropping this set of data. ' | ||
'Check your database! Error: %s', e) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters