diff --git a/backend/AnimatedDrawings/application/animation_queue.py b/backend/AnimatedDrawings/application/animation_queue.py index 891d3cd23..059804a39 100644 --- a/backend/AnimatedDrawings/application/animation_queue.py +++ b/backend/AnimatedDrawings/application/animation_queue.py @@ -12,8 +12,9 @@ from application.config.queue_config import QueueConfig from application.model.motion import Motion from application.model.retarget import Retarget -from application.tasks import make_animation -from application.tasks import save_capsule_skin +from application.task.make_animation import MakeAnimation +from application.task.save_capsule_skin import SaveCapsuleSkin +from application.task.send_notification import SendNotification class AnimationQueueController: @@ -21,8 +22,12 @@ def __init__(self): self.queue_config = QueueConfig() self.require_keys = ['memberId', 'memberName', 'skinName', 'imageUrl', 'retarget', 'motionName'] - self.celery_work_queue_name = 'makeAnimationTask.queue' - self.celery_success_queue_name = 'saveCapsuleSkinTasks.queue' + self.celery_work_queue_name = 'makeAnimation.queue' + self.celery_success_queue_name = 'saveCapsuleSkin.queue' + self.celery_send_notification_queue_name = 'sendNotification.queue' + self.make_animation_task = MakeAnimation() + self.save_capsule_skin_task = SaveCapsuleSkin() + self.send_notification_task = SendNotification() def run(self): # rabbitmq 채널 연결 @@ -66,10 +71,17 @@ def callback( filename = '%s.gif' % uuid.uuid4() chain( - make_animation.s(json_object, filename).set( - queue=self.celery_work_queue_name), - save_capsule_skin.s(json_object, filename).set( - queue=self.celery_success_queue_name) + self.make_animation_task.s(input_data=json_object, + filename=filename) + .set(queue=self.celery_work_queue_name), + + self.save_capsule_skin_task.s(input_data=json_object, + filename=filename) + .set(queue=self.celery_success_queue_name), + + self.send_notification_task.s(input_data=json_object, + filename=filename) + .set(queue=self.celery_send_notification_queue_name) ).apply_async( ignore_result=True ) @@ -95,7 +107,9 @@ def parse_json(self, body: bytes): json_object['retarget'] = Retarget[json_object['retarget']].value json_object['motionName'] = Motion[json_object['motionName']].value - self.valid_json_object(json_object) + for key in self.require_keys: + if key not in json_object: + raise KeyError return json_object @@ -103,11 +117,6 @@ def parse_json(self, body: bytes): logging.exception('json 파싱 오류', e) raise e - def valid_json_object(self, json_object: dict) -> None: - for key in self.require_keys: - if key not in json_object: - raise KeyError - if __name__ == '__main__': application = AnimationQueueController() diff --git a/backend/AnimatedDrawings/application/celery_app.py b/backend/AnimatedDrawings/application/celery_app.py new file mode 100644 index 000000000..a9f59553f --- /dev/null +++ b/backend/AnimatedDrawings/application/celery_app.py @@ -0,0 +1,23 @@ +from celery import Celery +from kombu import Queue + +from application.config.queue_config import QueueConfig +from application.task.make_animation import MakeAnimation +from application.task.save_capsule_skin import SaveCapsuleSkin +from application.task.send_notification import SendNotification + +queue_config = QueueConfig() +celery = Celery('application', + broker=queue_config.get_queue_url(), + include=['application.task']) + +celery.conf.result_expires = 300 +celery.conf.task_queues = ( + Queue('makeAnimation.queue'), + Queue('saveCapsuleSkin.queue'), + Queue('sendNotification.queue') +) + +celery.register_task(MakeAnimation()) +celery.register_task(SaveCapsuleSkin()) +celery.register_task(SendNotification()) diff --git a/backend/AnimatedDrawings/application/handler.py b/backend/AnimatedDrawings/application/task/base_task.py similarity index 100% rename from backend/AnimatedDrawings/application/handler.py rename to backend/AnimatedDrawings/application/task/base_task.py diff --git a/backend/AnimatedDrawings/application/task/make_animation.py b/backend/AnimatedDrawings/application/task/make_animation.py new file mode 100644 index 000000000..431b13cab --- /dev/null +++ b/backend/AnimatedDrawings/application/task/make_animation.py @@ -0,0 +1,41 @@ +import os +import shutil +from pathlib import Path + +import requests + +from application.config.s3_config import S3Config +from application.s3.s3_connection import get_object_wrapper +from application.task.base_task import LogErrorsTask +from examples.annotations_to_animation import annotations_to_animation +from examples.image_to_annotations import image_to_annotations + + +class MakeAnimation(LogErrorsTask): + name = 'make_animation' + + def __init__(self): + self.s3_bucket_name = S3Config().s3_bucket_name + + def run(self, *args, **kwargs): + img_bytes = requests.get(kwargs['input_data']['imageUrl']).content + + output_directory = 'capsuleSkin/' + kwargs['input_data']['memberId'] + result = Path(output_directory) + result.mkdir(exist_ok=True) + + image_to_annotations(img_bytes, result) + annotations_to_animation(output_directory, + kwargs['input_data']['motionName'], + kwargs['input_data']['retarget']) + + with open(output_directory + '/video.gif', 'rb') as image: + gif_bytes = bytearray(image.read()) + + output_wrapper = get_object_wrapper(self.s3_bucket_name, + '%s/%s' % (output_directory, + kwargs['filename'])) + output_wrapper.put(gif_bytes) + + if os.path.exists(output_directory): + shutil.rmtree(output_directory) diff --git a/backend/AnimatedDrawings/application/task/save_capsule_skin.py b/backend/AnimatedDrawings/application/task/save_capsule_skin.py new file mode 100644 index 000000000..481d41d34 --- /dev/null +++ b/backend/AnimatedDrawings/application/task/save_capsule_skin.py @@ -0,0 +1,29 @@ +from sqlalchemy import create_engine +from sqlalchemy.orm import Session + +from application.config.database_config import DatabaseConfig +from application.task.base_task import LogErrorsTask +from application.model.capsule_skin import CapsuleSkin +from application.model.motion import Motion +from application.model.retarget import Retarget + + +class SaveCapsuleSkin(LogErrorsTask): + name = 'save_capsule_skin' + database_config = DatabaseConfig() + engine = create_engine(database_config.get_database_url()) + + def run(self, *args, **kwargs): + capsule_skin = CapsuleSkin(skin_name=kwargs['input_data']['skinName'], + image_url='capsuleSkin/%s/%s' % ( + kwargs['input_data']['memberId'], + kwargs['filename']), + motion_name=Motion( + kwargs['input_data']['motionName']).name, + retarget=Retarget( + kwargs['input_data']['retarget']).name, + member_id=kwargs['input_data']['memberId']) + + with Session(self.engine) as session: + session.add(capsule_skin) + session.commit() diff --git a/backend/AnimatedDrawings/application/task/send_notification.py b/backend/AnimatedDrawings/application/task/send_notification.py new file mode 100644 index 000000000..68636fd8f --- /dev/null +++ b/backend/AnimatedDrawings/application/task/send_notification.py @@ -0,0 +1,25 @@ +import requests + +from application.task.base_task import LogErrorsTask + + +class SendNotification(LogErrorsTask): + name = 'send_notification' + + def __init__(self): + self.title = '캡슐 스킨 생성이 완료되었습니다!' + self.notification_server_url = 'https://notification.archive-timecapsule.kro.kr/api/notification/capsule_skin/send' + + def run(self, *args, **kwargs): + request_data = { + 'memberId': kwargs['input_data']['memberId'], + 'skinName': kwargs['input_data']['skinName'], + 'title': self.title, + 'text': f"{kwargs['input_data']['skinName']}이 생성되었습니다. ARchive에서 확인해보세요!", + 'skinUrl': kwargs['filename'] + } + + requests.post(self.notification_server_url, + json=request_data, + verify=False, + timeout=5) diff --git a/backend/AnimatedDrawings/application/tasks.py b/backend/AnimatedDrawings/application/tasks.py deleted file mode 100644 index b4642db3a..000000000 --- a/backend/AnimatedDrawings/application/tasks.py +++ /dev/null @@ -1,95 +0,0 @@ -import os.path -import shutil -from pathlib import Path - -import requests -from celery import Celery -from kombu import Queue -from sqlalchemy import create_engine -from sqlalchemy.orm import Session - -from application.config.database_config import DatabaseConfig -from application.config.queue_config import QueueConfig -from application.config.s3_config import S3Config -from application.handler import LogErrorsTask -from application.model.capsule_skin import CapsuleSkin -from application.model.motion import Motion -from application.model.retarget import Retarget -from application.s3.s3_connection import get_object_wrapper -from examples.annotations_to_animation import annotations_to_animation -from examples.image_to_annotations import image_to_annotations - -queue_config = QueueConfig() -celery = Celery('tasks', - broker=queue_config.get_queue_url(), - include=['application.tasks']) - -celery.conf.result_expires = 300 -celery.conf.task_queues = ( - Queue('makeAnimationTask.queue'), - Queue('saveCapsuleSkinTasks.queue'), -) - -database_config = DatabaseConfig() -engine = create_engine(database_config.get_database_url()) - -s3_config = S3Config() - - -@celery.task(base=LogErrorsTask) -def make_animation(input_data: dict, filename: str) -> None: - img_bytes = requests.get(input_data['imageUrl']).content - - output_directory = image_to_animation(img_bytes, input_data) - - upload_gif_to_s3(output_directory, filename) - - clear_resource(output_directory) - - -def image_to_animation(img_bytes: bytes, input_data: dict) -> str: - output_directory = 'capsuleSkin/' + input_data['memberId'] - - output_path = create_directory(output_directory) - - image_to_annotations(img_bytes, output_path) - annotations_to_animation(output_directory, input_data['motionName'], - input_data['retarget']) - return output_directory - - -def create_directory(output_directory: str) -> Path: - result = Path(output_directory) - result.mkdir(exist_ok=True) - return result - - -def upload_gif_to_s3(output: str, filename: str) -> None: - gif_bytes = read_animation_result(output + '/video.gif') - - output_wrapper = get_object_wrapper(s3_config.s3_bucket_name, '%s/%s' % (output, filename)) - output_wrapper.put(gif_bytes) - - -def read_animation_result(output: str) -> bytes: - with open(output, 'rb') as image: - return bytearray(image.read()) - - -def clear_resource(output_directory: str) -> None: - if os.path.exists(output_directory): - shutil.rmtree(output_directory) - - -@celery.task(base=LogErrorsTask) -def save_capsule_skin(_, input_data: dict, filename: str) -> None: - capsule_skin = CapsuleSkin(skin_name=input_data['skinName'], - image_url='capsuleSkin/%s/%s' % ( - input_data['memberId'], filename), - motion_name=Motion(input_data['motionName']).name, - retarget=Retarget(input_data['retarget']).name, - member_id=input_data['memberId']) - - with Session(engine) as session: - session.add(capsule_skin) - session.commit() diff --git a/backend/AnimatedDrawings/supervisord.conf b/backend/AnimatedDrawings/supervisord.conf index afa94b944..cf4da2daa 100644 --- a/backend/AnimatedDrawings/supervisord.conf +++ b/backend/AnimatedDrawings/supervisord.conf @@ -8,7 +8,7 @@ password = 1234 [program:flower] directory = /app/application -command = /opt/conda/bin/conda run -n animated_drawings celery -A tasks flower --conf="/app/application/config/flowerconfig.py" +command = /opt/conda/bin/conda run -n animated_drawings celery -A celery_app flower --conf="/app/application/config/flowerconfig.py" priority = 100 stdout_logfile = /var/log/flower.log stderr_logfile = /var/log/flower.err @@ -23,7 +23,7 @@ environment=ENVIRONMENT=%(ENV_ENVIRONMENT)s,PYOPENGL_PLATFORM="osmesa" [program:celery] directory = /app/application -command = /opt/conda/bin/conda run -n animated_drawings celery -A tasks worker +command = /opt/conda/bin/conda run -n animated_drawings celery -A celery_app worker priority = 200 stdout_logfile = /var/log/celeryd.log stderr_logfile = /var/log/celeryd.err