Skip to content

Commit

Permalink
implement tasks API and adapt notification tests
Browse files Browse the repository at this point in the history
  • Loading branch information
bernt-matthias committed Oct 28, 2024
1 parent d9dcad8 commit f4e1d6d
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 24 deletions.
2 changes: 2 additions & 0 deletions bioblend/_tests/GalaxyTestBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
@test_util.skip_unless_galaxy()
class GalaxyTestBase(unittest.TestCase):
gi: GalaxyInstance
config: Dict[str, Any]

@classmethod
def setUpClass(cls):
galaxy_key = os.environ["BIOBLEND_GALAXY_API_KEY"]
galaxy_url = os.environ["BIOBLEND_GALAXY_URL"]
cls.gi = GalaxyInstance(url=galaxy_url, key=galaxy_key)
cls.config = cls.gi.config.get_config()

def _test_dataset(self, history_id: str, contents: str = "1\t2\t3", **kwargs: Any) -> str:
tool_output = self.gi.tools.paste_content(contents, history_id, **kwargs)
Expand Down
78 changes: 54 additions & 24 deletions bioblend/_tests/TestGalaxyNotifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
Optional,
)

from galaxy.tool_util.verify.wait import wait_on

from bioblend.galaxy import GalaxyInstance
from . import (
GalaxyTestBase,
Expand All @@ -32,6 +34,8 @@ def test_notification_status(self):
if not self.gi.users.get_current_user()["is_admin"]:
self.skipTest("This tests requires the current user to be an admin, which is not the case.")

task_based = self.gi.config.get_config()["enable_celery_tasks"]

# user creation for the test
user1 = self._create_local_test_user(password="password")
user2 = self._create_local_test_user(password="password")
Expand All @@ -56,14 +60,14 @@ def test_notification_status(self):
[user1["id"]],
message="test_notification_status 1",
)
assert created_response_1["total_notifications_sent"] == 1
self._assert_notifications_sent(task_based, created_response_1, 1)

# Both user1 and user2 will receive this notification
created_response_2 = self._send_test_notification_to(
[user1["id"], user2["id"]],
message="test_notification_status 2",
)
assert created_response_2["total_notifications_sent"] == 2
self._assert_notifications_sent(task_based, created_response_2, 2)

# All users will receive this broadcasted notification
self._send_test_broadcast_notification(message="test_notification_status 3")
Expand Down Expand Up @@ -146,11 +150,16 @@ def test_get_user_notifications(self):
if not self.gi.users.get_current_user()["is_admin"]:
self.skipTest("This tests requires the current user to be an admin, which is not the case.")

task_based = self.gi.config.get_config()["enable_celery_tasks"]

# send the notifications
user_id = [self.gi.users.get_current_user()["id"]]
self._send_test_notification_to(user_id, message="test_notification_status 1")
self._send_test_notification_to(user_id, message="test_notification_status 2")
self._send_test_notification_to(user_id, message="test_notification_status 3")
send_response = self._send_test_notification_to(user_id, message="test_notification_status 1")
self._assert_notifications_sent(task_based, send_response, 1)
send_response = self._send_test_notification_to(user_id, message="test_notification_status 2")
self._assert_notifications_sent(task_based, send_response, 1)
send_response = self._send_test_notification_to(user_id, message="test_notification_status 3")
self._assert_notifications_sent(task_based, send_response, 1)

# this should fetch all the notifications
created_response_1 = self.gi.notifications.get_user_notifications()
Expand Down Expand Up @@ -239,6 +248,7 @@ def test_show_notification(self):
self.skipTest("This Galaxy instance is not configured to use notifications.")
if not self.gi.users.get_current_user()["is_admin"]:
self.skipTest("This tests requires the current user to be an admin, which is not the case.")
task_based = self.gi.config.get_config()["enable_celery_tasks"]

# user creation for the test
user = self._create_local_test_user(password="password")
Expand All @@ -247,10 +257,12 @@ def test_show_notification(self):
user_gi = GalaxyInstance(url=self.gi.base_url, email=user["email"], password="password")

# send the test notification
notification_response = self._send_test_notification_to([user["id"]], message="test_notification_status")[
"notification"
]
notification_id = notification_response["id"]
send_response = self._send_test_notification_to([user["id"]], message="test_notification_status")
self._assert_notifications_sent(task_based, send_response, 1)

notifications = user_gi.notifications.get_user_notifications()
notification_id = notifications[0]["id"]
user = self.gi.notifications.get_user_notifications()

# Fetch the notification
notification = user_gi.notifications.show_notification(notification_id)
Expand All @@ -269,6 +281,7 @@ def test_update_notifications(self):
self.skipTest("This Galaxy instance is not configured to use notifications.")
if not self.gi.users.get_current_user()["is_admin"]:
self.skipTest("This tests requires the current user to be an admin, which is not the case.")
task_based = self.gi.config.get_config()["enable_celery_tasks"]

# user creation for the test
user = self._create_local_test_user(password="password")
Expand All @@ -277,12 +290,14 @@ def test_update_notifications(self):
user_gi = GalaxyInstance(url=self.gi.base_url, email=user["email"], password="password")

# send the test notifications and save their ids
notification_1_id = self._send_test_notification_to([user["id"]], message="test_notification_status 1")[
"notification"
]["id"]
notification_2_id = self._send_test_notification_to([user["id"]], message="test_notification_status 2")[
"notification"
]["id"]
send_response = self._send_test_notification_to([user["id"]], message="test_notification_status 1")
self._assert_notifications_sent(task_based, send_response, 1)
send_response = self._send_test_notification_to([user["id"]], message="test_notification_status 2")
self._assert_notifications_sent(task_based, send_response, 1)

notifications = user_gi.notifications.get_user_notifications()
notification_1_id = notifications[0]["id"]
notification_2_id = notifications[1]["id"]

# fetch the notifications
notification_1 = user_gi.notifications.show_notification(notification_1_id)
Expand Down Expand Up @@ -349,6 +364,7 @@ def test_delete_notifications(self):
self.skipTest("This Galaxy instance is not configured to use notifications.")
if not self.gi.users.get_current_user()["is_admin"]:
self.skipTest("This tests requires the current user to be an admin, which is not the case.")
task_based = self.gi.config.get_config()["enable_celery_tasks"]

# user creation for the test
user = self._create_local_test_user(password="password")
Expand All @@ -357,15 +373,17 @@ def test_delete_notifications(self):
user_gi = GalaxyInstance(url=self.gi.base_url, email=user["email"], password="password")

# send the test notifications and save their ids
notification_1 = self._send_test_notification_to([user["id"]], message="test_notification_status 1")
print(notification_1)
notification_1_id = notification_1["notification"]["id"]
notification_2_id = self._send_test_notification_to([user["id"]], message="test_notification_status 2")[
"notification"
]["id"]
notification_3_id = self._send_test_notification_to([user["id"]], message="test_notification_status 3")[
"notification"
]["id"]
send_response = self._send_test_notification_to([user["id"]], message="test_notification_status 1")
self._assert_notifications_sent(task_based, send_response, 1)
send_response = self._send_test_notification_to([user["id"]], message="test_notification_status 2")
self._assert_notifications_sent(task_based, send_response, 1)
send_response = self._send_test_notification_to([user["id"]], message="test_notification_status 3")
self._assert_notifications_sent(task_based, send_response, 1)

notifications = user_gi.notifications.get_user_notifications()
notification_1_id = notifications[0]["id"]
notification_2_id = notifications[1]["id"]
notification_3_id = notifications[2]["id"]

# delete a single notifications
response_1 = user_gi.notifications.delete_user_notification(notification_id=notification_1_id)
Expand Down Expand Up @@ -545,4 +563,16 @@ def _send_test_notification_to(
user_ids=user_ids,
expiration_time=(datetime.utcnow() + timedelta(days=1)),
)
print(f"_send_test_notification_to {notification=}")
return notification

def _assert_notifications_sent(self, task_based, response, expected_count: int = 0):
def task_success():
return None if self.gi.tasks.get_task_status(task_id) != "SUCCESS" else True

if task_based:
task_id = response["id"]
assert task_id is not None
wait_on(task_success, "Task successful", 60)
else:
assert response["total_notifications_sent"] == expected_count
2 changes: 2 additions & 0 deletions bioblend/galaxy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
notifications,
quotas,
roles,
tasks,
tool_data,
tool_dependencies,
tools,
Expand Down Expand Up @@ -111,6 +112,7 @@ def __init__(
self.folders = folders.FoldersClient(self)
self.tool_dependencies = tool_dependencies.ToolDependenciesClient(self)
self.notifications = notifications.NotificationClient(self)
self.tasks = tasks.TasksClient(self)

def __repr__(self) -> str:
"""
Expand Down
39 changes: 39 additions & 0 deletions bioblend/galaxy/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""
Contains possible interaction dealing with Galaxy tasks.
"""

from typing import (
Any,
Dict,
TYPE_CHECKING,
)

from bioblend.galaxy.client import Client

if TYPE_CHECKING:
from uuid import UUID
from bioblend.galaxy import GalaxyInstance


class TasksClient(Client):
"""
This endpoint only works on Galaxy 22.05 or later.
"""

module = "tasks"

def __init__(self, galaxy_instance: "GalaxyInstance") -> None:
super().__init__(galaxy_instance)

def get_task_status(self, task_id: "UUID") -> Dict[str, Any]:
"""
Determine state of task ID
:type task_id: UUID
:param task_id: the task ID
:rtype: str
:return: String indicating task state
"""
url = self._make_url() + f"/{task_id}/state"
return self._get(url=url)

0 comments on commit f4e1d6d

Please sign in to comment.