Skip to content

Commit

Permalink
Merge pull request #3233 from digitalfabrik/feature/celery
Browse files Browse the repository at this point in the history
Set up Celery for Integreat Chat
  • Loading branch information
svenseeberg authored Nov 29, 2024
2 parents 15d888d + 3ca5bc9 commit 1328f87
Show file tree
Hide file tree
Showing 15 changed files with 284 additions and 110 deletions.
1 change: 1 addition & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ jobs:
else
python3 -m venv .venv
source .venv/bin/activate
pip install --upgrade pip
pip install -e .[dev-pinned,pinned]
fi
- save_cache:
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,6 @@ integreat_cms/xliff/download

# Postgres folder
.postgres

# Celery
celerybeat-schedule.db
63 changes: 0 additions & 63 deletions integreat_cms/api/v3/chat/chat_bot.py

This file was deleted.

39 changes: 8 additions & 31 deletions integreat_cms/api/v3/chat/user_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

from ....cms.models import ABTester, AttachmentMap, Language, Region, UserChat
from ...decorators import json_response
from .chat_bot import ChatBot
from .zammad_api import ZammadChatAPI
from .utils.chat_bot import process_answer, process_user_message
from .utils.zammad_api import ZammadChatAPI

if TYPE_CHECKING:
from django.http import HttpRequest
Expand Down Expand Up @@ -231,11 +231,8 @@ def zammad_webhook(request: HttpRequest) -> JsonResponse:
)
if not region.integreat_chat_enabled:
return JsonResponse({"status": "Integreat Chat disabled"})
client = ZammadChatAPI(region)
webhook_message = json.loads(request.body)
message_text = webhook_message["article"]["body"]
zammad_chat = UserChat.objects.get(zammad_id=webhook_message["ticket"]["id"])
chat_bot = ChatBot()

actions = []
if webhook_message["article"]["internal"]:
Expand All @@ -249,34 +246,14 @@ def zammad_webhook(request: HttpRequest) -> JsonResponse:
webhook_message["article"]["created_by"]["login"]
== "[email protected]"
):
actions.append("question translation")
client.send_message(
zammad_chat.zammad_id,
chat_bot.automatic_translation(
message_text, zammad_chat.language.slug, region.default_language.slug
),
True,
True,
actions.append("question translation queued")
process_user_message.apply_async(
args=[message_text, region.slug, webhook_message["ticket"]["id"]]
)
if answer := chat_bot.automatic_answer(
message_text, region, zammad_chat.language.slug
):
actions.append("automatic answer")
client.send_message(
zammad_chat.zammad_id,
answer,
False,
True,
)
else:
actions.append("answer translation")
client.send_message(
zammad_chat.zammad_id,
chat_bot.automatic_translation(
message_text, region.default_language.slug, zammad_chat.language.slug
),
False,
True,
actions.append("answer translation queued")
process_answer.apply_async(
args=[message_text, region.slug, webhook_message["ticket"]["id"]]
)
return JsonResponse(
{
Expand Down
3 changes: 3 additions & 0 deletions integreat_cms/api/v3/chat/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""
Utils for the Integreat Chat
"""
111 changes: 111 additions & 0 deletions integreat_cms/api/v3/chat/utils/chat_bot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
"""
Wrapper for the Chat Bot / LLM API
"""

from __future__ import annotations

import requests
from celery import shared_task
from django.conf import settings

from integreat_cms.cms.models import Region, UserChat
from integreat_cms.cms.utils.content_translation_utils import (
get_public_translation_for_link,
)

from .zammad_api import ZammadChatAPI


def format_message(response: dict) -> str:
"""
Transform JSON into readable message
"""
if "answer" not in response or not response["answer"]:
raise ValueError("Could not format message, no answer attribute in response")
if "sources" not in response or not response["sources"]:
return response["answer"]
sources = "".join(
[
f"<li><a href='{settings.WEBAPP_URL}{path}'>{title}</a></li>"
for path in response["sources"]
if (title := get_public_translation_for_link(settings.WEBAPP_URL + path))
]
)
return f"{response['answer']}\n<ul>{sources}</ul>"


def automatic_answer(message: str, region: Region, language_slug: str) -> str | None:
"""
Get automatic answer to question
"""
url = (
f"https://{settings.INTEGREAT_CHAT_BACK_END_DOMAIN}/chatanswers/extract_answer/"
)
body = {"message": message, "language": language_slug, "region": region.slug}
r = requests.post(url, json=body, timeout=120)
return format_message(r.json())


def automatic_translation(
message: str, source_language_slug: str, target_language_slug: str
) -> str:
"""
Use LLM to translate message
"""
url = f"https://{settings.INTEGREAT_CHAT_BACK_END_DOMAIN}/chatanswers/translate_message/"
body = {
"message": message,
"source_language": source_language_slug,
"target_language": target_language_slug,
}
response = requests.post(url, json=body, timeout=120).json()
if "status" in response and response["status"] == "success":
return response["translation"]
raise ValueError("Did not receive success response for translation request.")


@shared_task
def process_user_message(
message_text: str, region_slug: str, zammad_ticket_id: int
) -> None:
"""
Process the message from an Integreat App user
"""
zammad_chat = UserChat.objects.get(zammad_id=zammad_ticket_id)
region = Region.objects.get(slug=region_slug)
client = ZammadChatAPI(region)
if translation := automatic_translation(
message_text, zammad_chat.language.slug, region.default_language.slug
):
client.send_message(
zammad_chat.zammad_id,
translation,
True,
True,
)
if answer := automatic_answer(message_text, region, zammad_chat.language.slug):
client.send_message(
zammad_chat.zammad_id,
answer,
False,
True,
)


@shared_task
def process_answer(message_text: str, region_slug: str, zammad_ticket_id: int) -> None:
"""
Process automatic or counselor answers
"""
zammad_chat = UserChat.objects.get(zammad_id=zammad_ticket_id)
region = Region.objects.get(slug=region_slug)
client = ZammadChatAPI(region)
if translation := automatic_translation(
message_text, region.default_language.slug, zammad_chat.language.slug
):
client.send_message(
zammad_chat.zammad_id,
translation,
False,
True,
)
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from requests.exceptions import HTTPError
from zammad_py import ZammadAPI

from ....cms.models import AttachmentMap, Region, UserChat
from integreat_cms.cms.models import AttachmentMap, Region, UserChat

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -166,10 +166,20 @@ def get_messages(self, chat: UserChat) -> dict[str, dict | list[dict]]:

# pylint: disable=method-hidden
def send_message(
self, chat_id: int, message: str, internal: bool = False, auto: bool = False
self,
chat_id: int,
message: str,
internal: bool = False,
automatic_message: bool = False,
) -> dict:
"""
Post a new message to the given ticket
param chat_id: Zammad ID of the chat
param message: The message body
param internal: keep the message internal in Zammad (do not show to user)
param automatic_message: sets title to "automatically generated message"
return: dict with Zammad article data
"""
params = {
"ticket_id": chat_id,
Expand All @@ -178,9 +188,11 @@ def send_message(
"content_type": "text/html",
"internal": internal,
"subject": (
"automatically generated message" if auto else "app user message"
"automatically generated message"
if automatic_message
else "app user message"
),
"sender": "Customer" if not auto else "Agent",
"sender": "Customer" if not automatic_message else "Agent",
}
return self._parse_response( # type: ignore[return-value]
self._attempt_call(self.client.ticket_article.create, params=params)
Expand Down
47 changes: 46 additions & 1 deletion integreat_cms/core/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
from django.core.exceptions import ImproperlyConfigured
from django.utils.translation import gettext_lazy as _

from ..nominatim_api.utils import BoundingBox
from integreat_cms.nominatim_api.utils import BoundingBox

from .logging_formatter import ColorFormatter, RequestFormatter
from .utils.strtobool import strtobool

Expand Down Expand Up @@ -372,6 +373,7 @@
"integreat_cms.nominatim_api",
"integreat_cms.summ_ai_api",
"integreat_cms.textlab_api",
"integreat_cms.integreat_celery",
# Installed Django apps
"django.contrib.auth",
"django.contrib.contenttypes",
Expand Down Expand Up @@ -1335,3 +1337,46 @@

#: Zammad ticket group used for Integreat chat messages
USER_CHAT_TICKET_GROUP: Final[str] = "integreat-chat"

#: Integreat Chat (app) backend server domain
INTEGREAT_CHAT_BACK_END_DOMAIN = "igchat-inference.tuerantuer.org"

##########
# CELERY #
##########

#: Configure Celery to use a custom time zone. The timezone value can be any time zone supported by the pytz library.
#: If not set the UTC timezone is used. For backwards compatibility there is also a CELERY_ENABLE_UTC setting,
#: and this is set to false the system local timezone is used instead.
CELERY_TIMEZONE = "UTC"

#: If True the task will report its status as ``started`` when the task is executed by a worker.
#: The default value is False as the normal behavior is to not report that level of granularity.
#: Tasks are either pending, finished, or waiting to be retried.
#: Having a ``started`` state can be useful for when there are long running tasks
#: and there’s a need to report what task is currently running.
CELERY_TASK_TRACK_STARTED = True

#: Task hard time limit in seconds. The worker processing the task will be killed
#: and replaced with a new one when this is exceeded.
CELERY_TASK_TIME_LIMIT = 60 * 60 * 1

#: Default broker URL.
CELERY_BROKER_URL = os.environ.get(
"CELERY_REDIS_URL",
(
"redis+socket:///var/run/redis/redis-server.sock"
if not DEBUG
else "redis://localhost:6379/0"
),
)

#: The backend used to store task results (tombstones). Disabled by default.
CELERY_RESULT_BACKEND = os.environ.get(
"CELERY_REDIS_URL",
(
"redis+socket:///var/run/redis/redis-server.sock"
if not DEBUG
else "redis://localhost:6379/0"
),
)
5 changes: 5 additions & 0 deletions integreat_cms/integreat_celery/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ("celery_app",)
14 changes: 14 additions & 0 deletions integreat_cms/integreat_celery/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"""
Set up celery app
"""

from django.apps import AppConfig


class IntegreatCeleryConfig(AppConfig):
"""
Configuration for Celery
"""

default_auto_field = "django.db.models.BigAutoField"
name = "integreat_cms.integreat_celery"
Loading

0 comments on commit 1328f87

Please sign in to comment.