diff --git a/.github/workflows/unittest.yml b/.github/workflows/unittest.yml index c78bfca..e256a7a 100644 --- a/.github/workflows/unittest.yml +++ b/.github/workflows/unittest.yml @@ -25,10 +25,17 @@ jobs: - name: install run: | python -m pip install -e . + - name: Stand up docker containers for ActiveMQ and PostgreSQL + run: | + docker compose up -d + sleep 1 - name: run unit tests run: | echo "running unit tests" python -m pytest --cov=src --cov-report=xml --cov-report=term-missing tests/ + - name: Stand down docker containers for ActiveMQ and PostgreSQL + run: | + docker compose down - name: upload coverage to codecov uses: codecov/codecov-action@v5 if: diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..a434fba --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,19 @@ +services: + + db: + restart: always + image: postgres:14 + environment: + POSTGRES_DB: workflow + POSTGRES_USER: workflow + POSTGRES_PASSWORD: workflow + ports: + - 5432:5432 + + activemq: + image: apache/activemq-artemis:latest-alpine + volumes: + - ./tests/data/broker.xml:/var/lib/artemis-instance/etc-override/broker.xml + ports: + - 8161:8161 + - 61613:61613 diff --git a/src/artemis_data_collector/artemis_data_collector.py b/src/artemis_data_collector/artemis_data_collector.py index d4ec7cb..ce74a52 100644 --- a/src/artemis_data_collector/artemis_data_collector.py +++ b/src/artemis_data_collector/artemis_data_collector.py @@ -10,12 +10,18 @@ logger = logging.getLogger("AtremisDataCollector") -def initialize_database_tables(db_hostname, db_port, db_user, db_password, db_name): +def initialize_database_tables(config): """Initializes the tables in the database from sql files. This will fail if the tables already exist. WebMon should have already created the tables so this is mostly for testing.""" logger.info("Initializing tables") - with psycopg.connect(dbname=db_name, host=db_hostname, port=db_port, user=db_user, password=db_password) as conn: + with psycopg.connect( + dbname=config.database_name, + host=config.database_hostname, + port=config.database_port, + user=config.database_user, + password=config.database_password, + ) as conn: with conn.cursor() as cur: cur.execute(files("artemis_data_collector.sql").joinpath("report_statusqueue.sql").read_text()) conn.commit() @@ -90,11 +96,8 @@ def request_activemq(self, query): """Make a request to ActiveMQ Artemis Jolokia API""" try: response = self.session.get(self.base_url + query) - except requests.exceptions.Timeout as e: - logger.error(f"Timeout: {e}") - return None except requests.exceptions.RequestException as e: - logger.error(f"Error: {e}") + logger.error(e) return None if response.status_code != 200: @@ -164,7 +167,7 @@ def get_database_statusqueues(self): return queue_map -def main(): +def parse_args(args): parser = argparse.ArgumentParser(description="Collect data from Artemis") parser.add_argument("--version", action="version", version="%(prog)s 1.0") parser.add_argument( @@ -187,24 +190,21 @@ def main(): parser.add_argument("--interval", type=int, default=600, help="Interval to collect data (seconds)") parser.add_argument("--log_level", default="INFO", help="Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)") parser.add_argument("--log_file", help="Log file. If not specified, log to stdout") - config = parser.parse_args() + return parser.parse_args(args) + +def main(): + config = parse_args(sys.argv[1:]) # setup logging logging.basicConfig( format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=config.log_level, filename=config.log_file ) - try: - if config.initialize_db: - initialize_database_tables( - config.database_hostname, - config.database_port, - config.database_user, - config.database_password, - config.database_name, - ) - return 0 + if config.initialize_db: + initialize_database_tables(config) + return 0 + try: adc = ArtemisDataCollector(config) adc.run() except KeyboardInterrupt: @@ -212,7 +212,7 @@ def main(): return 0 except Exception as e: # catch any unhandled exception and log it before exiting - logger.exception(f"Error: {e}") + logger.exception(e) return 1 diff --git a/tests/data/broker.xml b/tests/data/broker.xml new file mode 100644 index 0000000..204ca00 --- /dev/null +++ b/tests/data/broker.xml @@ -0,0 +1,277 @@ + + + + + + + + 0.0.0.0 + + + true + + + 1 + + + ASYNCIO + + data/paging + + data/bindings + + data/journal + + data/large-messages + + + + + + + true + + 2 + + 10 + + 4096 + + 10M + + + 16000 + + + + 4096 + + + + + + + + + + + + + + + + + + + + + 5000 + + + 90 + + + true + + 120000 + + 60000 + + HALT + + + 932000 + + + + + + -1 + + + + + + + + + + + + + + tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true;supportAdvisory=false;suppressInternalManagementObjects=false + + + tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true + + + tcp://0.0.0.0:61613?anycastPrefix=/queue/;multicastPrefix=/topic/;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true + + + tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true + + + tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true + + + + + + + + + + + + + + + + + + + + + + + + DLQ + ExpiryQueue + 0 + + -1 + 10 + PAGE + true + true + + + + DLQ + ExpiryQueue + 0 + + 10 + PAGE + true + true + false + false + + + 10M + + + + -1 + -1 + + + + -1 + 20M + + + + -1 + -1 + + + + +
+ + + +
+
+ + + +
+ +
+ + + + +
+
diff --git a/tests/test_artemis_data_collector.py b/tests/test_artemis_data_collector.py new file mode 100644 index 0000000..4bb1b4d --- /dev/null +++ b/tests/test_artemis_data_collector.py @@ -0,0 +1,220 @@ +import unittest +from collections import namedtuple + +import psycopg +import pytest +import stomp + +from artemis_data_collector.artemis_data_collector import ArtemisDataCollector, initialize_database_tables, parse_args + +Config = namedtuple( + "Config", + [ + "artemis_user", + "artemis_password", + "artemis_url", + "broker_name", + "queue_list", + "database_hostname", + "database_port", + "database_user", + "database_password", + "database_name", + ], +) +config = Config( + "artemis", + "artemis", + "http://localhost:8161", + "0.0.0.0", + ["TEST_QUEUE", "TEST_QUEUE2", "DLD", "DOES_NOT_EXIST"], + "localhost", + 5432, + "workflow", + "workflow", + "workflow", +) + + +class TestArtemisDataCollector(unittest.TestCase): + @classmethod + def setUpClass(cls): + # initialize database tables + try: + initialize_database_tables(config) + except psycopg.errors.DuplicateTable: + pass + + # Add test queue to the database + with psycopg.connect( + dbname=config.database_name, + host=config.database_hostname, + port=config.database_port, + user=config.database_user, + password=config.database_password, + ) as conn: + with conn.cursor() as cur: + # create test queue if it does not exist + cur.execute("SELECT id FROM report_statusqueue WHERE name = 'TEST_QUEUE'") + queue_id = cur.fetchone() + if queue_id is None: + cur.execute("INSERT INTO report_statusqueue (name, is_workflow_input) VALUES ('TEST_QUEUE', true)") + cur.execute("SELECT id FROM report_statusqueue WHERE name = 'TEST_QUEUE'") + queue_id = cur.fetchone() + cls.queue_id = queue_id[0] + + # create another test queue, one that doesn't exist in the Artemis broker + cur.execute("SELECT id FROM report_statusqueue WHERE name = 'TEST_QUEUE2'") + if cur.fetchone() is None: + cur.execute("INSERT INTO report_statusqueue (name, is_workflow_input) VALUES ('TEST_QUEUE2', true)") + + # create activeMQ Artemis test queue by sending a message with stomp + conn = stomp.Connection(host_and_ports=[("localhost", 61613)]) + conn.connect("artemis", "artemis", wait=True) + conn.send("/queue/TEST_QUEUE", "test") + conn.disconnect() + + def test_get_database_statusqueues(self): + adc = ArtemisDataCollector(config) + queues = adc.get_database_statusqueues() + assert isinstance(queues, dict) + assert "TEST_QUEUE" in queues + assert queues["TEST_QUEUE"] == self.queue_id + + def test_get_activemq_queues(self): + adc = ArtemisDataCollector(config) + queues = adc.get_activemq_queues() + assert isinstance(queues, list) + assert "TEST_QUEUE" in queues + + def test_collect_data(self): + adc = ArtemisDataCollector(config) + data = adc.collect_data() + assert isinstance(data, list) + assert len(data) == 1 + assert data[0][0] == self.queue_id + assert data[0][1] > 0 + + def get_latest_queue_message_count(self): + with psycopg.connect( + dbname=config.database_name, + host=config.database_hostname, + port=config.database_port, + user=config.database_user, + password=config.database_password, + ) as conn: + with conn.cursor() as cur: + # get newest record + cur.execute( + "SELECT (id, queue_id, message_count) FROM report_statusqueuemessagecount ORDER BY id DESC LIMIT 1" + ) + result = cur.fetchone() + + return [int(x) for x in result[0]] + + def test_add_to_database(self): + adc = ArtemisDataCollector(config) + adc.add_to_database([(self.queue_id, 0)]) + + result = self.get_latest_queue_message_count() + current_id = result[0] + assert result[1] == self.queue_id + assert result[2] == 0 + + adc = ArtemisDataCollector(config) + adc.add_to_database([(self.queue_id, 42)]) + + result = self.get_latest_queue_message_count() + assert result[0] == current_id + 1 + assert result[1] == self.queue_id + assert result[2] == 42 + + def test_no_valid_queues(self): + config_no_valid_queues = Config( + "artemis", + "artemis", + "http://localhost:8161", + "0.0.0.0", + ["AAA", "BBB"], + "localhost", + 5432, + "workflow", + "workflow", + "workflow", + ) + + with pytest.raises(ValueError) as e: + ArtemisDataCollector(config_no_valid_queues) + + assert "No queues to monitor" in str(e) + + def test_invalid_artemis_url(self): + config_invalid_url = Config( + "artemis", + "artemis", + "http://localhost:12345", + "0.0.0.0", + ["TEST_QUEUE"], + "localhost", + 5432, + "workflow", + "workflow", + "workflow", + ) + + with pytest.raises(ValueError) as e: + ArtemisDataCollector(config_invalid_url) + + assert "Failed to get queues from ActiveMQ Artemis" in str(e) + + def test_wrong_artemis_password(self): + config_wrong_password = Config( + "artemis", + "AAA", + "http://localhost:8161", + "0.0.0.0", + ["TEST_QUEUE"], + "localhost", + 5432, + "workflow", + "workflow", + "workflow", + ) + + with pytest.raises(ValueError) as e: + ArtemisDataCollector(config_wrong_password) + + assert "Failed to get queues from ActiveMQ Artemis" in str(e) + + def test_wrong_broker_name(self): + config_wrong_broker_name = Config( + "artemis", + "artemis", + "http://localhost:8161", + "AAA", + ["TEST_QUEUE"], + "localhost", + 5432, + "workflow", + "workflow", + "workflow", + ) + + with pytest.raises(ValueError) as e: + ArtemisDataCollector(config_wrong_broker_name) + + assert "Failed to get queues from ActiveMQ Artemis" in str(e) + + +def test_parse_args(): + # test some default values + args = parse_args([]) + assert args.initialize_db is False + assert args.artemis_user == "artemis" + assert args.database_name == "workflow" + assert args.queue_list is None + assert args.interval == 600 + + # test setting queue list + args = parse_args(["--queue_list", "TEST_QUEUE", "TEST_QUEUE2", "TEST_QUEUE3"]) + assert args.queue_list == ["TEST_QUEUE", "TEST_QUEUE2", "TEST_QUEUE3"]