From f4e1d6d08d58c85de9845e4981216760c0700f5f Mon Sep 17 00:00:00 2001 From: Matthias Bernt Date: Mon, 28 Oct 2024 15:18:04 +0100 Subject: [PATCH] implement tasks API and adapt notification tests --- bioblend/_tests/GalaxyTestBase.py | 2 + bioblend/_tests/TestGalaxyNotifications.py | 78 +++++++++++++++------- bioblend/galaxy/__init__.py | 2 + bioblend/galaxy/tasks/__init__.py | 39 +++++++++++ 4 files changed, 97 insertions(+), 24 deletions(-) create mode 100755 bioblend/galaxy/tasks/__init__.py diff --git a/bioblend/_tests/GalaxyTestBase.py b/bioblend/_tests/GalaxyTestBase.py index 351cf6d91..d7dd6b748 100644 --- a/bioblend/_tests/GalaxyTestBase.py +++ b/bioblend/_tests/GalaxyTestBase.py @@ -18,12 +18,14 @@ @test_util.skip_unless_galaxy() class GalaxyTestBase(unittest.TestCase): gi: GalaxyInstance + config: Dict[str, Any] @classmethod def setUpClass(cls): galaxy_key = os.environ["BIOBLEND_GALAXY_API_KEY"] galaxy_url = os.environ["BIOBLEND_GALAXY_URL"] cls.gi = GalaxyInstance(url=galaxy_url, key=galaxy_key) + cls.config = cls.gi.config.get_config() def _test_dataset(self, history_id: str, contents: str = "1\t2\t3", **kwargs: Any) -> str: tool_output = self.gi.tools.paste_content(contents, history_id, **kwargs) diff --git a/bioblend/_tests/TestGalaxyNotifications.py b/bioblend/_tests/TestGalaxyNotifications.py index 4adb6bd04..52391b81b 100755 --- a/bioblend/_tests/TestGalaxyNotifications.py +++ b/bioblend/_tests/TestGalaxyNotifications.py @@ -9,6 +9,8 @@ Optional, ) +from galaxy.tool_util.verify.wait import wait_on + from bioblend.galaxy import GalaxyInstance from . import ( GalaxyTestBase, @@ -32,6 +34,8 @@ def test_notification_status(self): if not self.gi.users.get_current_user()["is_admin"]: self.skipTest("This tests requires the current user to be an admin, which is not the case.") + task_based = self.gi.config.get_config()["enable_celery_tasks"] + # user creation for the test user1 = self._create_local_test_user(password="password") user2 = self._create_local_test_user(password="password") @@ -56,14 +60,14 @@ def test_notification_status(self): [user1["id"]], message="test_notification_status 1", ) - assert created_response_1["total_notifications_sent"] == 1 + self._assert_notifications_sent(task_based, created_response_1, 1) # Both user1 and user2 will receive this notification created_response_2 = self._send_test_notification_to( [user1["id"], user2["id"]], message="test_notification_status 2", ) - assert created_response_2["total_notifications_sent"] == 2 + self._assert_notifications_sent(task_based, created_response_2, 2) # All users will receive this broadcasted notification self._send_test_broadcast_notification(message="test_notification_status 3") @@ -146,11 +150,16 @@ def test_get_user_notifications(self): if not self.gi.users.get_current_user()["is_admin"]: self.skipTest("This tests requires the current user to be an admin, which is not the case.") + task_based = self.gi.config.get_config()["enable_celery_tasks"] + # send the notifications user_id = [self.gi.users.get_current_user()["id"]] - self._send_test_notification_to(user_id, message="test_notification_status 1") - self._send_test_notification_to(user_id, message="test_notification_status 2") - self._send_test_notification_to(user_id, message="test_notification_status 3") + send_response = self._send_test_notification_to(user_id, message="test_notification_status 1") + self._assert_notifications_sent(task_based, send_response, 1) + send_response = self._send_test_notification_to(user_id, message="test_notification_status 2") + self._assert_notifications_sent(task_based, send_response, 1) + send_response = self._send_test_notification_to(user_id, message="test_notification_status 3") + self._assert_notifications_sent(task_based, send_response, 1) # this should fetch all the notifications created_response_1 = self.gi.notifications.get_user_notifications() @@ -239,6 +248,7 @@ def test_show_notification(self): self.skipTest("This Galaxy instance is not configured to use notifications.") if not self.gi.users.get_current_user()["is_admin"]: self.skipTest("This tests requires the current user to be an admin, which is not the case.") + task_based = self.gi.config.get_config()["enable_celery_tasks"] # user creation for the test user = self._create_local_test_user(password="password") @@ -247,10 +257,12 @@ def test_show_notification(self): user_gi = GalaxyInstance(url=self.gi.base_url, email=user["email"], password="password") # send the test notification - notification_response = self._send_test_notification_to([user["id"]], message="test_notification_status")[ - "notification" - ] - notification_id = notification_response["id"] + send_response = self._send_test_notification_to([user["id"]], message="test_notification_status") + self._assert_notifications_sent(task_based, send_response, 1) + + notifications = user_gi.notifications.get_user_notifications() + notification_id = notifications[0]["id"] + user = self.gi.notifications.get_user_notifications() # Fetch the notification notification = user_gi.notifications.show_notification(notification_id) @@ -269,6 +281,7 @@ def test_update_notifications(self): self.skipTest("This Galaxy instance is not configured to use notifications.") if not self.gi.users.get_current_user()["is_admin"]: self.skipTest("This tests requires the current user to be an admin, which is not the case.") + task_based = self.gi.config.get_config()["enable_celery_tasks"] # user creation for the test user = self._create_local_test_user(password="password") @@ -277,12 +290,14 @@ def test_update_notifications(self): user_gi = GalaxyInstance(url=self.gi.base_url, email=user["email"], password="password") # send the test notifications and save their ids - notification_1_id = self._send_test_notification_to([user["id"]], message="test_notification_status 1")[ - "notification" - ]["id"] - notification_2_id = self._send_test_notification_to([user["id"]], message="test_notification_status 2")[ - "notification" - ]["id"] + send_response = self._send_test_notification_to([user["id"]], message="test_notification_status 1") + self._assert_notifications_sent(task_based, send_response, 1) + send_response = self._send_test_notification_to([user["id"]], message="test_notification_status 2") + self._assert_notifications_sent(task_based, send_response, 1) + + notifications = user_gi.notifications.get_user_notifications() + notification_1_id = notifications[0]["id"] + notification_2_id = notifications[1]["id"] # fetch the notifications notification_1 = user_gi.notifications.show_notification(notification_1_id) @@ -349,6 +364,7 @@ def test_delete_notifications(self): self.skipTest("This Galaxy instance is not configured to use notifications.") if not self.gi.users.get_current_user()["is_admin"]: self.skipTest("This tests requires the current user to be an admin, which is not the case.") + task_based = self.gi.config.get_config()["enable_celery_tasks"] # user creation for the test user = self._create_local_test_user(password="password") @@ -357,15 +373,17 @@ def test_delete_notifications(self): user_gi = GalaxyInstance(url=self.gi.base_url, email=user["email"], password="password") # send the test notifications and save their ids - notification_1 = self._send_test_notification_to([user["id"]], message="test_notification_status 1") - print(notification_1) - notification_1_id = notification_1["notification"]["id"] - notification_2_id = self._send_test_notification_to([user["id"]], message="test_notification_status 2")[ - "notification" - ]["id"] - notification_3_id = self._send_test_notification_to([user["id"]], message="test_notification_status 3")[ - "notification" - ]["id"] + send_response = self._send_test_notification_to([user["id"]], message="test_notification_status 1") + self._assert_notifications_sent(task_based, send_response, 1) + send_response = self._send_test_notification_to([user["id"]], message="test_notification_status 2") + self._assert_notifications_sent(task_based, send_response, 1) + send_response = self._send_test_notification_to([user["id"]], message="test_notification_status 3") + self._assert_notifications_sent(task_based, send_response, 1) + + notifications = user_gi.notifications.get_user_notifications() + notification_1_id = notifications[0]["id"] + notification_2_id = notifications[1]["id"] + notification_3_id = notifications[2]["id"] # delete a single notifications response_1 = user_gi.notifications.delete_user_notification(notification_id=notification_1_id) @@ -545,4 +563,16 @@ def _send_test_notification_to( user_ids=user_ids, expiration_time=(datetime.utcnow() + timedelta(days=1)), ) + print(f"_send_test_notification_to {notification=}") return notification + + def _assert_notifications_sent(self, task_based, response, expected_count: int = 0): + def task_success(): + return None if self.gi.tasks.get_task_status(task_id) != "SUCCESS" else True + + if task_based: + task_id = response["id"] + assert task_id is not None + wait_on(task_success, "Task successful", 60) + else: + assert response["total_notifications_sent"] == expected_count diff --git a/bioblend/galaxy/__init__.py b/bioblend/galaxy/__init__.py index 495eeead0..754f5619b 100644 --- a/bioblend/galaxy/__init__.py +++ b/bioblend/galaxy/__init__.py @@ -22,6 +22,7 @@ notifications, quotas, roles, + tasks, tool_data, tool_dependencies, tools, @@ -111,6 +112,7 @@ def __init__( self.folders = folders.FoldersClient(self) self.tool_dependencies = tool_dependencies.ToolDependenciesClient(self) self.notifications = notifications.NotificationClient(self) + self.tasks = tasks.TasksClient(self) def __repr__(self) -> str: """ diff --git a/bioblend/galaxy/tasks/__init__.py b/bioblend/galaxy/tasks/__init__.py new file mode 100755 index 000000000..f15ffa223 --- /dev/null +++ b/bioblend/galaxy/tasks/__init__.py @@ -0,0 +1,39 @@ +""" +Contains possible interaction dealing with Galaxy tasks. +""" + +from typing import ( + Any, + Dict, + TYPE_CHECKING, +) + +from bioblend.galaxy.client import Client + +if TYPE_CHECKING: + from uuid import UUID + from bioblend.galaxy import GalaxyInstance + + +class TasksClient(Client): + """ + This endpoint only works on Galaxy 22.05 or later. + """ + + module = "tasks" + + def __init__(self, galaxy_instance: "GalaxyInstance") -> None: + super().__init__(galaxy_instance) + + def get_task_status(self, task_id: "UUID") -> Dict[str, Any]: + """ + Determine state of task ID + + :type task_id: UUID + :param task_id: the task ID + + :rtype: str + :return: String indicating task state + """ + url = self._make_url() + f"/{task_id}/state" + return self._get(url=url)