From 07b4b7de6a265da66df5d420ea32b8c3b74dc579 Mon Sep 17 00:00:00 2001 From: Nathan Adams Date: Sun, 17 Jul 2022 21:15:58 +0200 Subject: [PATCH] Support InfluxDB 2.x as addition to 1.8 --- data/varken.example.ini | 1 + requirements.txt | 1 + varken/dbmanager.py | 77 ++++++++++++++++++++++++++++++++--------- varken/iniparser.py | 4 ++- varken/structures.py | 1 + varken/tautulli.py | 26 +++++++------- 6 files changed, 79 insertions(+), 31 deletions(-) diff --git a/data/varken.example.ini b/data/varken.example.ini index b32eab6d..a63285c3 100644 --- a/data/varken.example.ini +++ b/data/varken.example.ini @@ -16,6 +16,7 @@ ssl = false verify_ssl = false username = root password = root +org = - [tautulli-1] url = tautulli.domain.tld:8181 diff --git a/requirements.txt b/requirements.txt index 523e4279..4477bd88 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,6 +5,7 @@ requests==2.25.1 geoip2==2.9.0 influxdb==5.2.0 +influxdb-client==1.30.0 schedule==0.6.0 distro==1.4.0 urllib3==1.26.5 diff --git a/varken/dbmanager.py b/varken/dbmanager.py index c832fdf3..aaf2104e 100644 --- a/varken/dbmanager.py +++ b/varken/dbmanager.py @@ -1,45 +1,88 @@ +import re from sys import exit from logging import getLogger -from influxdb import InfluxDBClient -from requests.exceptions import ConnectionError -from influxdb.exceptions import InfluxDBServerError +from influxdb_client import InfluxDBClient, BucketRetentionRules +from influxdb_client.client.write_api import SYNCHRONOUS +from influxdb_client.client.exceptions import InfluxDBError +from urllib3.exceptions import NewConnectionError class DBManager(object): def __init__(self, server): self.server = server self.logger = getLogger() + self.bucket = "varken" + if self.server.url == "influxdb.domain.tld": self.logger.critical("You have not configured your varken.ini. Please read Wiki page for configuration") exit() - self.influx = InfluxDBClient(host=self.server.url, port=self.server.port, username=self.server.username, - password=self.server.password, ssl=self.server.ssl, database='varken', - verify_ssl=self.server.verify_ssl) + + url = self.server.url + if 'http' not in url: + scheme = 'http' + if self.server.ssl: + scheme = 'https' + url = "{}://{}d:{}".format(scheme, self.server.url, self.server.port) + + token = f'{self.server.username}:password={self.server.password}' + + self.influx = InfluxDBClient(url=url, token=token, + verify_ssl=self.server.verify_ssl, org=self.server.org) + try: - version = self.influx.request('ping', expected_response_code=204).headers['X-Influxdb-Version'] + version = self.influx.version() self.logger.info('Influxdb version: %s', version) - except ConnectionError: - self.logger.critical("Error testing connection to InfluxDB. Please check your url/hostname") + match = re.match(r'(\d+)\.', version) + if match: + self.version = int(match[1]) + self.logger.info("Using InfluxDB API v{}", self.version) + else: + self.logger.critical("Unknown influxdb version") + exit(1) + except NewConnectionError: + self.logger.critical("Error getting InfluxDB version number. Please check your url/hostname are valid") exit(1) - databases = [db['name'] for db in self.influx.get_list_database()] + if self.version >= 2: + self.create_v2_bucket() + else: + self.create_v1_database() - if 'varken' not in databases: + def create_v2_bucket(self): + if not self.influx.buckets_api().find_bucket_by_name(self.bucket): + self.logger.info("Creating varken bucket") + + retention = BucketRetentionRules(type="expire", every_seconds=60 * 60 * 24 * 30, + shard_group_duration_seconds=60 * 60) + self.influx.buckets_api().create_bucket(bucket_name=self.bucket, + retention_rules=retention) + + def create_v1_database(self): + from influxdb import InfluxDBClient + client = InfluxDBClient(host=self.server.url, port=self.server.port, username=self.server.username, + password=self.server.password, ssl=self.server.ssl, database=self.bucket, + verify_ssl=self.server.verify_ssl) + databases = [db['name'] for db in client.get_list_database()] + + if self.bucket not in databases: self.logger.info("Creating varken database") - self.influx.create_database('varken') + client.create_database(self.bucket) retention_policies = [policy['name'] for policy in - self.influx.get_list_retention_policies(database='varken')] + client.get_list_retention_policies(database=self.bucket)] if 'varken 30d-1h' not in retention_policies: self.logger.info("Creating varken retention policy (30d-1h)") - self.influx.create_retention_policy(name='varken 30d-1h', duration='30d', replication='1', - database='varken', default=True, shard_duration='1h') + client.create_retention_policy(name='varken 30d-1h', duration='30d', replication='1', + database=self.bucket, default=True, shard_duration='1h') + + self.bucket = f'{self.bucket}/varken 30d-1h' def write_points(self, data): d = data self.logger.debug('Writing Data to InfluxDB %s', d) + write_api = self.influx.write_api(write_options=SYNCHRONOUS) try: - self.influx.write_points(d) - except (InfluxDBServerError, ConnectionError) as e: + write_api.write(bucket=self.bucket, record=data) + except (InfluxDBError, NewConnectionError) as e: self.logger.error('Error writing data to influxdb. Dropping this set of data. ' 'Check your database! Error: %s', e) diff --git a/varken/iniparser.py b/varken/iniparser.py index bcb3b37d..3eca865f 100644 --- a/varken/iniparser.py +++ b/varken/iniparser.py @@ -154,13 +154,15 @@ def parse_opts(self, read_file=False): username = env.get('VRKN_INFLUXDB_USERNAME', self.config.get('influxdb', 'username')) password = env.get('VRKN_INFLUXDB_PASSWORD', self.config.get('influxdb', 'password')) + + org = env.get('VRKN_INFLUXDB_ORG', self.config.get('influxdb', 'org')) except NoOptionError as e: self.logger.error('Missing key in %s. Error: %s', "influxdb", e) self.rectify_ini() return self.influx_server = InfluxServer(url=url, port=port, username=username, password=password, ssl=ssl, - verify_ssl=verify_ssl) + verify_ssl=verify_ssl, org=org) # Check for all enabled services for service in self.services: diff --git a/varken/structures.py b/varken/structures.py index e3ee0940..2ec78ff9 100644 --- a/varken/structures.py +++ b/varken/structures.py @@ -18,6 +18,7 @@ class InfluxServer(NamedTuple): url: str = 'localhost' username: str = 'root' verify_ssl: bool = False + org: str = '-' class SonarrServer(NamedTuple): diff --git a/varken/tautulli.py b/varken/tautulli.py index 746685fd..ca585255 100644 --- a/varken/tautulli.py +++ b/varken/tautulli.py @@ -2,7 +2,7 @@ from requests import Session, Request from geoip2.errors import AddressNotFoundError from datetime import datetime, timezone, date, timedelta -from influxdb.exceptions import InfluxDBClientError +from influxdb_client.client.exceptions import InfluxDBError from varken.structures import TautulliStream from varken.helpers import hashit, connection_handler, itemgetter_with_default @@ -202,17 +202,17 @@ def get_stats(self): for library in get: data = { - "measurement": "Tautulli", - "tags": { - "type": "library_stats", - "server": self.server.id, - "section_name": library['section_name'], - "section_type": library['section_type'] - }, - "time": now, - "fields": { - "total": int(library['count']) - } + "measurement": "Tautulli", + "tags": { + "type": "library_stats", + "server": self.server.id, + "section_name": library['section_name'], + "section_type": library['section_type'] + }, + "time": now, + "fields": { + "total": int(library['count']) + } } if library['section_type'] == 'show': data['fields']['seasons'] = int(library['parent_count']) @@ -363,7 +363,7 @@ def get_historical(self, days=30): ) try: self.dbmanager.write_points(influx_payload) - except InfluxDBClientError as e: + except InfluxDBError as e: if "beyond retention policy" in str(e): self.logger.debug('Only imported 30 days of data per retention policy') else: