Skip to content

Commit

Permalink
tests: adds integration tests of context manager on write.
Browse files Browse the repository at this point in the history
  • Loading branch information
karel-rehor committed Sep 17, 2024
1 parent c6bed42 commit e5b8f7f
Showing 1 changed file with 113 additions and 1 deletion.
114 changes: 113 additions & 1 deletion tests/test_influxdb_client_3_integration.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
import logging
import os
import random
import string
import time
import unittest

import pyarrow
import pytest

from influxdb_client_3 import InfluxDBClient3, InfluxDBError
from influxdb_client_3 import InfluxDBClient3, InfluxDBError, write_client_options, WriteOptions


def random_hex(len=6):
return ''.join(random.choice(string.hexdigits) for i in range(len))


@pytest.mark.integration
Expand All @@ -20,13 +28,19 @@
)
class TestInfluxDBClient3Integration(unittest.TestCase):

@pytest.fixture(autouse=True)
def inject_fixtures(self, caplog):
self._caplog = caplog

def setUp(self):
self.host = os.getenv('TESTING_INFLUXDB_URL')
self.token = os.getenv('TESTING_INFLUXDB_TOKEN')
self.database = os.getenv('TESTING_INFLUXDB_DATABASE')
self.client = InfluxDBClient3(host=self.host, database=self.database, token=self.token)

def tearDown(self):
self._caplog.clear()
self._caplog.set_level(logging.ERROR)
if self.client:
self.client.close()

Expand Down Expand Up @@ -72,3 +86,101 @@ def test_error_headers(self):
self.assertIsNotNone(headers['X-Influxdb-Build'])
except KeyError as ke:
self.fail(f'Header {ke} not found')

def test_batch_write_callbacks(self):
write_success = False
write_error = False
write_count = 0

measurement = f'test{random_hex(6)}'
data_set_size = 40
batch_size = 10

def success(conf, data):
nonlocal write_success, write_count
write_success = True
write_count += 1

def error(conf, data, exception: InfluxDBError):
nonlocal write_error
write_error = True

w_opts = WriteOptions(
batch_size=batch_size,
flush_interval=1_000,
jitter_interval=500
)

wc_opts = write_client_options(
success_callback=success,
error_callback=error,
write_options=w_opts
)

now = time.time_ns()
with InfluxDBClient3(host=self.host,
database=self.database,
token=self.token,
write_client_options=wc_opts) as w_client:

for i in range(0, data_set_size):
w_client.write(f'{measurement},location=harfa val={i}i {now - (i * 1_000_000_000)}')

self.assertEqual(data_set_size / batch_size, write_count)
self.assertTrue(write_success)
self.assertFalse(write_error)

with InfluxDBClient3(host=self.host,
database=self.database,
token=self.token,
write_client_options=wc_opts) as r_client:

query = f"SELECT * FROM \"{measurement}\" WHERE time >= now() - interval '3 minute'"
reader: pyarrow.Table = r_client.query(query)
list_results = reader.to_pylist()
self.assertEqual(data_set_size, len(list_results))

def test_batch_write_closed(self):

self._caplog.set_level(logging.DEBUG)
# writing measurement for last cca 3hrs
# so repeat runs in that time frame could
# result in clashed result data if always
# using same measurement name
measurement = f'test{random_hex()}'
data_size = 10_000
w_opts = WriteOptions(
batch_size=100,
flush_interval=3_000,
jitter_interval=500
)

wc_opts = write_client_options(
write_options=w_opts
)

now = time.time_ns()
with InfluxDBClient3(host=self.host,
database=self.database,
token=self.token,
write_client_options=wc_opts,
debug=True) as w_client:

for i in range(0, data_size):
w_client.write(f'{measurement},location=harfa val={i}i {now - (i * 1_000_000_000)}')

lines = self._caplog.text.splitlines()
self.assertRegex(lines[len(lines) - 1], ".*the batching processor was disposed$")

logging.info("WRITING DONE")
with InfluxDBClient3(host=self.host,
database=self.database,
token=self.token,
write_client_options=wc_opts) as r_client:

logging.info("PREPARING QUERY")

query = f"SELECT * FROM \"{measurement}\" WHERE time >= now() - interval '3 hours'"
reader: pyarrow.Table = r_client.query(query, mode="")
list_results = reader.to_pylist()
self.assertEqual(data_size, len(list_results))

0 comments on commit e5b8f7f

Please sign in to comment.