diff --git a/.gitmodules b/.gitmodules index cc543b9a1..7a57751ee 100644 --- a/.gitmodules +++ b/.gitmodules @@ -4,3 +4,6 @@ [submodule "backend/notification/src/main/resources/config"] path = backend/notification/src/main/resources/config url = https://github.com/seokho-1116/DroidBlossom-config-notification.git +[submodule "backend/AnimatedDrawings/application/config/yml"] + path = backend/AnimatedDrawings/application/config/yml + url = https://github.com/seokho-1116/DroidBlosson-config-animation.git diff --git a/backend/AnimatedDrawings/application/__init__.py b/backend/AnimatedDrawings/application/__init__.py index e69de29bb..448c9fecb 100644 --- a/backend/AnimatedDrawings/application/__init__.py +++ b/backend/AnimatedDrawings/application/__init__.py @@ -0,0 +1,2 @@ +import celery_app +__all__ = ('celery_app',) \ No newline at end of file diff --git a/backend/AnimatedDrawings/application/animation_queue.py b/backend/AnimatedDrawings/application/animation_queue.py index da9df4717..b685d2159 100644 --- a/backend/AnimatedDrawings/application/animation_queue.py +++ b/backend/AnimatedDrawings/application/animation_queue.py @@ -1,22 +1,16 @@ -import json -import logging import uuid -from _xxsubinterpreters import ChannelClosedError -from json.decoder import JSONDecodeError +from ast import literal_eval -import pika from celery import chain -from pika.adapters.blocking_connection import BlockingChannel -from pika.spec import Basic, BasicProperties +from kombu import Exchange, Queue from application.config.queue_config import QueueConfig +from application.logging.logger_factory import LoggerFactory from application.model.motion import Motion from application.model.retarget import Retarget -from application.task.make_animation import MakeAnimation -from application.task.save_capsule_skin import SaveCapsuleSkin -from application.task.send_notification import SendNotification - -logger = logging.getLogger('animation_queue_controller') +from application.task.tasks import create_animation, save_capsule_skin, \ + send_notification +from kombu_connection_pool import connection, connections class AnimationQueueController: @@ -24,99 +18,100 @@ def __init__(self): self.queue_config = QueueConfig() self.require_keys = ['memberId', 'memberName', 'skinName', 'imageUrl', 'retarget', 'motionName'] - self.celery_work_queue_name = 'makeAnimation.queue' - self.celery_success_queue_name = 'saveCapsuleSkin.queue' - self.celery_send_notification_queue_name = 'sendNotification.queue' - self.make_animation_task = MakeAnimation() - self.save_capsule_skin_task = SaveCapsuleSkin() - self.send_notification_task = SendNotification() + self.celery_work_queue_name = 'task.makeAnimation.queue' + self.celery_success_queue_name = 'task.saveCapsuleSkin.queue' + self.celery_send_notification_queue_name = 'task.sendNotification.queue' + self.logger = LoggerFactory.get_logger(__name__) def run(self): - # rabbitmq 채널 연결 - connection = pika.BlockingConnection( - pika.ConnectionParameters(host=self.queue_config.queue_host)) - channel = connection.channel() - - channel.queue_declare(queue=self.queue_config.queue_name, durable=True) - - channel.basic_consume(queue=self.queue_config.queue_name, - on_message_callback=self.callback, - auto_ack=False) - - try: - channel.start_consuming() - except ChannelClosedError as e: - logger.info("커넥션 연결 오류") - raise e - finally: - channel.close() + """ + 큐 메시지 리스닝 + :return: + """ + capsule_skin_exchange = Exchange( + name=QueueConfig.CAPSULE_SKIN_REQUEST_EXCHANGE_NAME, + type='direct', + durable=True) + capsule_skin_queue = Queue( + name=QueueConfig.CAPSULE_SKIN_REQUEST_QUEUE_NAME, + exchange=capsule_skin_exchange, + routing_key=QueueConfig.CAPSULE_SKIN_REQUEST_QUEUE_NAME) + + with connections[connection].acquire(block=True) as conn: + with conn.Consumer(queues=[capsule_skin_queue], + callbacks=[self.callback], + accept=['json']): + self.logger.info('메시지 수신 시작') + while True: + conn.drain_events() def callback( self, - channel: BlockingChannel, - method: Basic.Deliver, - header: BasicProperties, - body: bytes, + body: [dict, str, bytes], + message: any ) -> None: """ - queue에서 message consume 시 동작하는 callback - celery worker한테 animation 생성 작업을 실행한다 - :param channel: 큐와 연결된 채널 - :param method: 메시지의 상태 - :param header: 기본 정보 - :param body: queue로부터 넘어온 데이터 + 큐에 메시지가 도착했을 때 동작하는 콜백 함수 + :param body: + :param message: + :return: """ - logger.info('큐 메시지 처리 시작 %s', header.message_id) try: - json_object = self.parse_json(body) + self.logger.debug('메시지 수신 완료, 콜백 동작') + parsed_data = self.parse_body(body) - filename = f"capsuleSkin/{json_object['memberId']}/{uuid.uuid4()}.gif" + filename = f"capsuleSkin/{parsed_data['memberId']}/{uuid.uuid4()}.gif" chain( - self.make_animation_task.s(input_data=json_object, - filename=filename) + create_animation.s(input_data=parsed_data, + filename=filename) .set(queue=self.celery_work_queue_name), - self.save_capsule_skin_task.s(input_data=json_object, - filename=filename) + save_capsule_skin.s(input_data=parsed_data, + filename=filename) .set(queue=self.celery_success_queue_name), - self.send_notification_task.s(input_data=json_object, - filename=filename) + send_notification.s(input_data=parsed_data, + filename=filename) .set(queue=self.celery_send_notification_queue_name) ).apply_async( ignore_result=True ) - channel.basic_ack(delivery_tag=method.delivery_tag) + message.ack() + self.logger.debug('celery에 작업 전달 완료') except Exception as e: - logger.exception('메시지 처리 오류', e) - channel.basic_reject(delivery_tag=method.delivery_tag, - requeue=False) + self.logger.exception('작업 큐 메시지 처리 오류 %r', e) + message.reject() - def parse_json(self, body: bytes): + def parse_body(self, body: [dict, str, bytes]) -> dict: """ - json bytes를 파싱해 dict로 반환하는 함수 - :param body: queue로부터 넘어온 json bytes - :return: body에서 파싱된 dict - - :raises JSONDecodeError: 유효하지 않은 json 형태인 경우 - :raise TypeError: 잘못된 json 입력 타입인 경우 + 큐로부터 온 메시지 파싱 함수 + :param body: 큐에서 온 메시지 + :return: 파싱된 메시지 dict """ try: - json_object = json.loads(body.decode(encoding='utf8')) - json_object['memberId'] = str(json_object['memberId']) - json_object['retarget'] = Retarget[json_object['retarget']].value - json_object['motionName'] = Motion[json_object['motionName']].value + if isinstance(body, str): + dict_data = literal_eval(body) + elif isinstance(body, dict): + dict_data = body + elif isinstance(body, bytes): + dict_data = literal_eval(body.decode('utf-8')) + else: + self.logger.error('처리할 수 없는 타입 오류') + raise TypeError('처리할 수 없는 타입입니다') + + dict_data['retarget'] = Retarget[dict_data['retarget']].value + dict_data['motionName'] = Motion[dict_data['motionName']].value for key in self.require_keys: - if key not in json_object: + if key not in dict_data: raise KeyError - return json_object + return dict_data - except (JSONDecodeError, KeyError, TypeError) as e: - logger.exception('json 파싱 오류', e) + except (KeyError, TypeError) as e: + self.logger.exception('작업 큐 메시지 파싱 오류 %r', e) raise e diff --git a/backend/AnimatedDrawings/application/celery_app.py b/backend/AnimatedDrawings/application/celery_app.py index a9f59553f..151a098a9 100644 --- a/backend/AnimatedDrawings/application/celery_app.py +++ b/backend/AnimatedDrawings/application/celery_app.py @@ -2,22 +2,15 @@ from kombu import Queue from application.config.queue_config import QueueConfig -from application.task.make_animation import MakeAnimation -from application.task.save_capsule_skin import SaveCapsuleSkin -from application.task.send_notification import SendNotification queue_config = QueueConfig() -celery = Celery('application', - broker=queue_config.get_queue_url(), - include=['application.task']) +celery = Celery('application.task', + broker=queue_config.get_celery_broker_url(), + include=['application.task.tasks']) celery.conf.result_expires = 300 celery.conf.task_queues = ( - Queue('makeAnimation.queue'), - Queue('saveCapsuleSkin.queue'), - Queue('sendNotification.queue') + Queue('task.makeAnimation.queue'), + Queue('task.saveCapsuleSkin.queue'), + Queue('task.sendNotification.queue') ) - -celery.register_task(MakeAnimation()) -celery.register_task(SaveCapsuleSkin()) -celery.register_task(SendNotification()) diff --git a/backend/AnimatedDrawings/application/config/database_config.py b/backend/AnimatedDrawings/application/config/database_config.py index cc52ba377..0b4cc8470 100644 --- a/backend/AnimatedDrawings/application/config/database_config.py +++ b/backend/AnimatedDrawings/application/config/database_config.py @@ -1,18 +1,21 @@ -from urllib.parse import urlparse, quote_plus +from sqlalchemy import URL from application.config.root_config import RootConfig -class DatabaseConfig(RootConfig): - def __init__(self): - self._db_username = self._config_file['spring']['datasource'][ - 'username'] - self._db_password = self._config_file['spring']['datasource'][ - 'password'] - self._db_url = urlparse( - self._config_file['spring']['datasource']['url']) +class DatabaseConfig: + USER_NAME = RootConfig.CONFIG_FILE['database']['username'] + PASSWORD = RootConfig.CONFIG_FILE['database']['password'] + DB_URL = RootConfig.CONFIG_FILE['database']['url'] + DB_NAME = RootConfig.CONFIG_FILE['database']['database_name'] - def get_database_url(self) -> str: - url = self._db_url.path.split("//")[1] - return 'mysql+pymysql://%s:%s@%s' % ( - self._db_username, quote_plus(self._db_password), url) + @staticmethod + def get_database_url() -> URL: + url_object = URL.create( + "mysql+pymysql", + username=DatabaseConfig.USER_NAME, + password=DatabaseConfig.PASSWORD, + host=DatabaseConfig.DB_URL, + database=DatabaseConfig.DB_NAME + ) + return url_object diff --git a/backend/AnimatedDrawings/application/config/flowerconfig.py b/backend/AnimatedDrawings/application/config/flowerconfig.py index eeccd1619..28f43ced4 100644 --- a/backend/AnimatedDrawings/application/config/flowerconfig.py +++ b/backend/AnimatedDrawings/application/config/flowerconfig.py @@ -1,19 +1,9 @@ from application.config.root_config import RootConfig -class FlowerConfig(RootConfig): - def __init__(self): - self._username = self._config_file['flower']['username'] - self._password = self._config_file['flower']['password'] +class FlowerConfig: + USERNAME = RootConfig.CONFIG_FILE['flower']['username'] + PASSWORD = RootConfig.CONFIG_FILE['flower']['password'] - @property - def username(self): - return self._username - @property - def password(self): - return self._password - - -config = FlowerConfig() -basic_auth = "%s:%s" % (config.username, config.password) +basic_auth = f'{FlowerConfig.USERNAME}:{FlowerConfig.PASSWORD}' diff --git a/backend/AnimatedDrawings/application/config/logger_config.py b/backend/AnimatedDrawings/application/config/logger_config.py new file mode 100644 index 000000000..7c2d93b26 --- /dev/null +++ b/backend/AnimatedDrawings/application/config/logger_config.py @@ -0,0 +1,10 @@ +from application.config.root_config import RootConfig + + +class LoggerConfig: + LOGGING_LEVEL = RootConfig.CONFIG_FILE['logging']['level'] + CELERY_OUTPUT_FILE_PATH = RootConfig.CONFIG_FILE['logging'][ + 'celery_output_file_path'] + APPLICATION_OUTPUT_FILE_PATH = RootConfig.CONFIG_FILE['logging'][ + 'application_output_file_path'] + FORMAT_STRING = RootConfig.CONFIG_FILE['logging']['format'] diff --git a/backend/AnimatedDrawings/application/config/queue_config.py b/backend/AnimatedDrawings/application/config/queue_config.py index 748933187..04d70546a 100644 --- a/backend/AnimatedDrawings/application/config/queue_config.py +++ b/backend/AnimatedDrawings/application/config/queue_config.py @@ -1,24 +1,27 @@ from application.config.root_config import RootConfig -class QueueConfig(RootConfig): - def __init__(self): - self._queue_username = self._config_file['spring']['rabbitmq']['username'] - self._queue_host = self._config_file['spring']['rabbitmq']['host'] - self._queue_password = self._config_file['spring']['rabbitmq']['password'] - self._queue_virtual_host = self._config_file['spring']['rabbitmq']['virtual-host'] - self._queue_name = 'capsuleSkin_delay.queue' +class QueueConfig: + USERNAME = RootConfig.CONFIG_FILE['rabbitmq']['username'] + BROKER_HOST = RootConfig.CONFIG_FILE['rabbitmq']['host'] + PASSWORD = RootConfig.CONFIG_FILE['rabbitmq']['password'] + VIRTUAL_HOST = RootConfig.CONFIG_FILE['rabbitmq']['virtual-host'] + CAPSULE_SKIN_REQUEST_QUEUE_NAME = RootConfig.CONFIG_FILE['rabbitmq'][ + 'queue_name'] + CAPSULE_SKIN_REQUEST_EXCHANGE_NAME = RootConfig.CONFIG_FILE['rabbitmq'][ + 'exchange_name'] + NOTIFICATION_EXCHANGE_NAME = RootConfig.CONFIG_FILE['rabbitmq'][ + 'notification_exchange_name'] + NOTIFICATION_QUEUE_NAME = RootConfig.CONFIG_FILE['rabbitmq'][ + 'notification_queue_name'] - def get_queue_url(self) -> str: - return 'pyamqp://%s:%s@%s:5672%s' % (self._queue_username, - self._queue_password, - self._queue_host, - self._queue_virtual_host) + @staticmethod + def get_celery_broker_url() -> str: + return 'pyamqp://%s:%s@%s:5672%s' % (QueueConfig.USERNAME, + QueueConfig.PASSWORD, + QueueConfig.BROKER_HOST, + QueueConfig.VIRTUAL_HOST) - @property - def queue_name(self) -> str: - return self._queue_name - - @property - def queue_host(self) -> str: - return self._queue_host + @staticmethod + def get_kombu_broker_url() -> str: + return f'amqp://{QueueConfig.USERNAME}:{QueueConfig.PASSWORD}@{QueueConfig.BROKER_HOST}/{QueueConfig.VIRTUAL_HOST}' diff --git a/backend/AnimatedDrawings/application/config/root_config.py b/backend/AnimatedDrawings/application/config/root_config.py index 22a4fcce2..555eb8de1 100644 --- a/backend/AnimatedDrawings/application/config/root_config.py +++ b/backend/AnimatedDrawings/application/config/root_config.py @@ -3,9 +3,12 @@ import yaml environment = os.environ.get('ENVIRONMENT', 'local') -APPLICATION_YAML_PATH = f'config/yaml/application-{environment}.yml' +APPLICATION_YAML_PATH = f'config/yml/config-{environment}.yml' class RootConfig: - with open(APPLICATION_YAML_PATH) as file: - _config_file = yaml.safe_load(file) + CONFIG_FILE = None + + if CONFIG_FILE is None: + with open(APPLICATION_YAML_PATH) as file: + CONFIG_FILE = yaml.safe_load(file) diff --git a/backend/AnimatedDrawings/application/config/s3_config.py b/backend/AnimatedDrawings/application/config/s3_config.py index ee95d91f8..83251298e 100644 --- a/backend/AnimatedDrawings/application/config/s3_config.py +++ b/backend/AnimatedDrawings/application/config/s3_config.py @@ -2,19 +2,6 @@ class S3Config(RootConfig): - def __init__(self): - self._access_key_id = self._config_file['s3']['accessKey'] - self._secret_access_key = self._config_file['s3']['secretKey'] - self._s3_bucket_name = self._config_file['s3']['bucket'] - - @property - def access_key(self) -> str: - return self._access_key_id - - @property - def secret_key(self) -> str: - return self._secret_access_key - - @property - def s3_bucket_name(self) -> str: - return self._s3_bucket_name + ACCESS_KEY_ID = RootConfig.CONFIG_FILE['s3']['accessKey'] + SECRET_ACCESS_KEY = RootConfig.CONFIG_FILE['s3']['secretKey'] + S3_BUCKET_NAME = RootConfig.CONFIG_FILE['s3']['bucket'] diff --git a/backend/AnimatedDrawings/application/config/torchserve_config.py b/backend/AnimatedDrawings/application/config/torchserve_config.py index a907d6fc2..fe740a2a4 100644 --- a/backend/AnimatedDrawings/application/config/torchserve_config.py +++ b/backend/AnimatedDrawings/application/config/torchserve_config.py @@ -1,10 +1,5 @@ from application.config.root_config import RootConfig -class TorchserveConfig(RootConfig): - def __init__(self): - self._torchserve_host = self._config_file['docker_torchserve']['host'] - - @property - def torchserve_host(self) -> str: - return self._torchserve_host +class TorchserveConfig: + TORCHSERVE_HOST = RootConfig.CONFIG_FILE['docker_torchserve']['host'] diff --git a/backend/AnimatedDrawings/application/config/yml b/backend/AnimatedDrawings/application/config/yml new file mode 160000 index 000000000..4a9107419 --- /dev/null +++ b/backend/AnimatedDrawings/application/config/yml @@ -0,0 +1 @@ +Subproject commit 4a910741904d7becc9fc7c04d02f59a2fd1be493 diff --git a/backend/AnimatedDrawings/application/kombu_connection_pool.py b/backend/AnimatedDrawings/application/kombu_connection_pool.py new file mode 100644 index 000000000..db57e3727 --- /dev/null +++ b/backend/AnimatedDrawings/application/kombu_connection_pool.py @@ -0,0 +1,48 @@ +from kombu import Connection, Queue, Exchange, pools + +from application.config.queue_config import QueueConfig +from application.logging.logger_factory import LoggerFactory + +logger = LoggerFactory.get_logger(__name__) + +""" +kombu 커넥션 풀 설정 및 큐, 익스체인지 선언 +""" + + +def errback(exc, interval): + logger.error('레빗 엠큐 커넥션 풀 연결 에러 발생 : %r', exc) + logger.info('레빗 엠큐 커넥션 풀 연결 재시도: %s초 후', interval) + + +connections = pools.Connections(limit=2) +producers = pools.Producers(limit=2) + +connection = Connection(hostname=QueueConfig.get_kombu_broker_url(), + connect_timeout=7, + errback=errback) + +logger.info('레빗 엠큐 커넥션 풀 연결 설정 완료') + +with connections[connection].acquire(block=True) as conn: + with conn.channel() as channel: + capsule_skin_exchange = Exchange( + name=QueueConfig.CAPSULE_SKIN_REQUEST_EXCHANGE_NAME, + type='direct', + durable=True) + capsule_skin_queue = Queue( + name=QueueConfig.CAPSULE_SKIN_REQUEST_QUEUE_NAME, + exchange=capsule_skin_exchange, + routing_key=QueueConfig.CAPSULE_SKIN_REQUEST_QUEUE_NAME) + capsule_skin_queue.declare(channel=channel) + + notification_exchange = Exchange( + name=QueueConfig.NOTIFICATION_EXCHANGE_NAME, + type='direct', + durable=True) + + notification_queue = Queue(name=QueueConfig.NOTIFICATION_QUEUE_NAME, + exchange=notification_exchange, + routing_key=QueueConfig.NOTIFICATION_QUEUE_NAME) + notification_queue.declare(channel=channel) +logger.info('레빗 엠큐 큐, 익스체인지 설정 완료') diff --git a/backend/AnimatedDrawings/application/logging/logger_factory.py b/backend/AnimatedDrawings/application/logging/logger_factory.py new file mode 100644 index 000000000..b0f1f4f3c --- /dev/null +++ b/backend/AnimatedDrawings/application/logging/logger_factory.py @@ -0,0 +1,51 @@ +import logging +import sys + +from application.config.logger_config import LoggerConfig + + +class LoggerFactory: + + @staticmethod + def get_logger(name: str) -> logging.Logger: + """ + 기본 로거를 생성해준다. + :param name: 로거의 이름 + :return: 설정된 로거를 반환한다. + """ + logger = logging.getLogger(name) + logger.setLevel(LoggerConfig.LOGGING_LEVEL) + + LoggerFactory._setup_handler(LoggerConfig.FORMAT_STRING, + LoggerConfig.LOGGING_LEVEL, + logger, + LoggerConfig.APPLICATION_OUTPUT_FILE_PATH) + + return logger + + @staticmethod + def setup_logger(logger: logging.Logger, output_file_path: str) -> None: + """ + 파라미터로 받은 로거에 포맷터를 설정한다. + :param logger: 설정할 로거 + :param output_file_path: 로그를 기록할 파일의 경로 + """ + LoggerFactory._setup_handler(LoggerConfig.FORMAT_STRING, + LoggerConfig.LOGGING_LEVEL, + logger, + output_file_path) + + @staticmethod + def _setup_handler(format_string, level, logger, output_file_path): + formatter = logging.Formatter(format_string) + + stream_handler = logging.StreamHandler(sys.stdout) + stream_handler.setLevel(level) + stream_handler.setFormatter(formatter) + + file_handler = logging.FileHandler(filename=output_file_path) + file_handler.setLevel(level) + file_handler.setFormatter(formatter) + + logger.addHandler(file_handler) + logger.addHandler(stream_handler) diff --git a/backend/AnimatedDrawings/application/s3/object_wrapper.py b/backend/AnimatedDrawings/application/s3/object_wrapper.py index e686ec5b8..736adbc33 100644 --- a/backend/AnimatedDrawings/application/s3/object_wrapper.py +++ b/backend/AnimatedDrawings/application/s3/object_wrapper.py @@ -20,7 +20,7 @@ def put(self, data: bytes): try: self.object.put(Body=put_data) self.object.wait_until_exists() - logging.info( + logging.debug( "버킷 '%s'에 '%s'를 저장합니다.", self.object.bucket_name, self.object.key, diff --git a/backend/AnimatedDrawings/application/s3/s3_connection.py b/backend/AnimatedDrawings/application/s3/s3_connection.py index 44cc57bd6..daab531ad 100644 --- a/backend/AnimatedDrawings/application/s3/s3_connection.py +++ b/backend/AnimatedDrawings/application/s3/s3_connection.py @@ -4,12 +4,10 @@ from application.config.s3_config import S3Config from application.s3.object_wrapper import ObjectWrapper -s3_config = S3Config() - def s3_resource(): - return boto3.resource('s3', aws_access_key_id=s3_config.access_key, - aws_secret_access_key=s3_config.secret_key, + return boto3.resource('s3', aws_access_key_id=S3Config.ACCESS_KEY_ID, + aws_secret_access_key=S3Config.SECRET_ACCESS_KEY, config=Config(signature_version='s3v4')) diff --git a/backend/AnimatedDrawings/application/task/base_task.py b/backend/AnimatedDrawings/application/task/base_task.py index 04783395c..22595acfe 100644 --- a/backend/AnimatedDrawings/application/task/base_task.py +++ b/backend/AnimatedDrawings/application/task/base_task.py @@ -1,12 +1,15 @@ -import logging +import json -import requests +import celery.signals from celery import Task +from celery.utils.log import get_task_logger +from kombu import Queue, Exchange -from application.model.notification_status import \ - NotificationStatus - -logger = logging.getLogger('error_task') +from application.config.logger_config import LoggerConfig +from application.config.queue_config import QueueConfig +from application.kombu_connection_pool import producers, connection +from application.logging.logger_factory import LoggerFactory +from application.model.notification_status import NotificationStatus class LogErrorsTask(Task): @@ -15,33 +18,50 @@ class LogErrorsTask(Task): retry_backoff = True retry_backoff_max = 700 retry_jitter = False - notification_server_url = 'https://notification.archive-timecapsule.kro.kr/api/notification/capsule_skin/send' + + def __init__(self): + self.task_logger = get_task_logger(__name__) + + @celery.signals.after_setup_task_logger.connect + def on_after_setup_logger(logger, **kwargs): + LoggerFactory.setup_logger(logger, LoggerConfig.CELERY_OUTPUT_FILE_PATH) + + def before_start(self, task_id, args, kwargs): + self.task_logger.debug(kwargs) + self.task_logger.debug('태스크 처리 시작 %s', task_id) def on_failure(self, exc, task_id, args, kwargs, einfo): - logger.error('태스크 처리 실패 %s', task_id) - request_data = { + self.task_logger.exception('태스크 처리 실패 %s', task_id, exc_info=einfo) + request_data = json.dumps({ 'memberId': kwargs['input_data']['memberId'], 'skinName': kwargs['input_data']['skinName'], 'title': '캡슐 스킨 생성에 실패했습니다', 'text': f"{kwargs['input_data']['skinName']}이 생성되지 않았습니다. 다시 한 번 시도해주세요!", 'skinUrl': kwargs['filename'], - 'status': NotificationStatus.SUCCESS.value - } + 'status': NotificationStatus.FAIL.value + }, ensure_ascii=False) + with producers[connection].acquire(block=True) as producer: + exchange = Exchange(name=QueueConfig.NOTIFICATION_EXCHANGE_NAME, + type='direct', + durable=True) - try: - r = requests.post(self.notification_server_url, - json=request_data, - verify=False, - timeout=5) - r.raise_for_status() - except requests.exceptions.HTTPError as ex: - logger.error('알림 서버 동작 오류 request: %s, response: %s', ex.request, ex.response) + queue = Queue(name=QueueConfig.NOTIFICATION_QUEUE_NAME, + exchange=exchange, + routing_key=QueueConfig.NOTIFICATION_QUEUE_NAME) - super(LogErrorsTask, self).on_failure(exc, task_id, args, kwargs, einfo) + producer.publish( + request_data, + declare=[queue], + exchange=exchange, + content_type='application/json', + routing_key=QueueConfig.NOTIFICATION_QUEUE_NAME, + ) def on_retry(self, exc, task_id, args, kwargs, einfo): - logger.error('태스크 재시도 %s', task_id) - super(LogErrorsTask, self).on_retry(exc, task_id, args, kwargs, einfo) + self.task_logger.debug(kwargs) + self.task_logger.exception('태스크 재시도 %s', task_id, exc_info=einfo) def on_success(self, retval, task_id, args, kwargs): - logger.info('태스크 처리 성공 %s', task_id) + self.task_logger.debug(args) + + self.task_logger.debug('태스크 처리 성공 %s', task_id) diff --git a/backend/AnimatedDrawings/application/task/make_animation.py b/backend/AnimatedDrawings/application/task/make_animation.py deleted file mode 100644 index 431b13cab..000000000 --- a/backend/AnimatedDrawings/application/task/make_animation.py +++ /dev/null @@ -1,41 +0,0 @@ -import os -import shutil -from pathlib import Path - -import requests - -from application.config.s3_config import S3Config -from application.s3.s3_connection import get_object_wrapper -from application.task.base_task import LogErrorsTask -from examples.annotations_to_animation import annotations_to_animation -from examples.image_to_annotations import image_to_annotations - - -class MakeAnimation(LogErrorsTask): - name = 'make_animation' - - def __init__(self): - self.s3_bucket_name = S3Config().s3_bucket_name - - def run(self, *args, **kwargs): - img_bytes = requests.get(kwargs['input_data']['imageUrl']).content - - output_directory = 'capsuleSkin/' + kwargs['input_data']['memberId'] - result = Path(output_directory) - result.mkdir(exist_ok=True) - - image_to_annotations(img_bytes, result) - annotations_to_animation(output_directory, - kwargs['input_data']['motionName'], - kwargs['input_data']['retarget']) - - with open(output_directory + '/video.gif', 'rb') as image: - gif_bytes = bytearray(image.read()) - - output_wrapper = get_object_wrapper(self.s3_bucket_name, - '%s/%s' % (output_directory, - kwargs['filename'])) - output_wrapper.put(gif_bytes) - - if os.path.exists(output_directory): - shutil.rmtree(output_directory) diff --git a/backend/AnimatedDrawings/application/task/save_capsule_skin.py b/backend/AnimatedDrawings/application/task/save_capsule_skin.py deleted file mode 100644 index 481d41d34..000000000 --- a/backend/AnimatedDrawings/application/task/save_capsule_skin.py +++ /dev/null @@ -1,29 +0,0 @@ -from sqlalchemy import create_engine -from sqlalchemy.orm import Session - -from application.config.database_config import DatabaseConfig -from application.task.base_task import LogErrorsTask -from application.model.capsule_skin import CapsuleSkin -from application.model.motion import Motion -from application.model.retarget import Retarget - - -class SaveCapsuleSkin(LogErrorsTask): - name = 'save_capsule_skin' - database_config = DatabaseConfig() - engine = create_engine(database_config.get_database_url()) - - def run(self, *args, **kwargs): - capsule_skin = CapsuleSkin(skin_name=kwargs['input_data']['skinName'], - image_url='capsuleSkin/%s/%s' % ( - kwargs['input_data']['memberId'], - kwargs['filename']), - motion_name=Motion( - kwargs['input_data']['motionName']).name, - retarget=Retarget( - kwargs['input_data']['retarget']).name, - member_id=kwargs['input_data']['memberId']) - - with Session(self.engine) as session: - session.add(capsule_skin) - session.commit() diff --git a/backend/AnimatedDrawings/application/task/send_notification.py b/backend/AnimatedDrawings/application/task/send_notification.py deleted file mode 100644 index 88e6e5a31..000000000 --- a/backend/AnimatedDrawings/application/task/send_notification.py +++ /dev/null @@ -1,32 +0,0 @@ -import logging - -import requests - -from application.model.notification_status import \ - NotificationStatus -from application.task.base_task import LogErrorsTask - -logger = logging.getLogger('send_notification') - - -class SendNotification(LogErrorsTask): - name = 'send_notification' - - def run(self, *args, **kwargs): - request_data = { - 'memberId': kwargs['input_data']['memberId'], - 'skinName': kwargs['input_data']['skinName'], - 'title': '캡슐 스킨 생성이 완료되었습니다', - 'text': f"{kwargs['input_data']['skinName']}이 생성되었습니다. ARchive에서 확인해보세요!", - 'skinUrl': kwargs['filename'], - 'status': NotificationStatus.SUCCESS.value - } - - try: - r = requests.post(self.notification_server_url, - json=request_data, - verify=False, - timeout=5) - r.raise_for_status() - except requests.exceptions.HTTPError as ex: - logger.error('알림 서버 동작 오류 request: %s, response: %s', ex.request, ex.response) diff --git a/backend/AnimatedDrawings/application/task/tasks.py b/backend/AnimatedDrawings/application/task/tasks.py new file mode 100644 index 000000000..67737b214 --- /dev/null +++ b/backend/AnimatedDrawings/application/task/tasks.py @@ -0,0 +1,114 @@ +import json +import os +import shutil +import uuid +from pathlib import Path + +import requests +from kombu import Exchange, Queue +from sqlalchemy import create_engine +from sqlalchemy.orm import Session + +from application.celery_app import celery +from application.config.database_config import DatabaseConfig +from application.config.queue_config import QueueConfig +from application.config.s3_config import S3Config +from application.kombu_connection_pool import producers, connection +from application.model.capsule_skin import CapsuleSkin +from application.model.motion import Motion +from application.model.notification_status import NotificationStatus +from application.model.retarget import Retarget +from application.s3.s3_connection import get_object_wrapper +from application.task.base_task import LogErrorsTask +from examples.annotations_to_animation import annotations_to_animation +from examples.image_to_annotations import image_to_annotations + +engine = create_engine(DatabaseConfig.get_database_url()) +s3_bucket_name = S3Config.S3_BUCKET_NAME + + +@celery.task(base=LogErrorsTask) +def create_animation(input_data: dict, filename: str): + """ + 애니메이션 생성 task + :param input_data: 입력 데이터(dict) - imageUrl, motionName, retarget, skinName, memberId, memberName + :param filename: 원격지에 저장될 파일 이름 ex) capsuleSkin/2/1234.gif + :return: + """ + img_bytes = requests.get(input_data['imageUrl']).content + + temporary_directory = f'capsuleSkin/{uuid.uuid4()}' + result = Path(temporary_directory) + result.mkdir(exist_ok=True) + + image_to_annotations(img_bytes, result) + annotations_to_animation(temporary_directory, + input_data['motionName'], + input_data['retarget']) + + with open(f'{temporary_directory}/video.gif', 'rb') as image: + gif_bytes = bytearray(image.read()) + + output_wrapper = get_object_wrapper(s3_bucket_name, filename) + + output_wrapper.put(gif_bytes) + + if os.path.exists(temporary_directory): + shutil.rmtree(temporary_directory) + + +@celery.task(base=LogErrorsTask) +def save_capsule_skin(_, input_data: dict, filename: str): + """ + 캡슐 스킨 생성 정보 DB 저장 태스크 + :param _: 이전 task 결과 + :param input_data: 입력 데이터(dict) - imageUrl, motionName, retarget, skinName, memberId, memberName + :param filename: 원격지에 저장될 파일 이름 ex) capsuleSkin/2/1234.gif + :return: + """ + capsule_skin = CapsuleSkin(skin_name=input_data['skinName'], + image_url=filename, + motion_name=Motion( + input_data['motionName']).name, + retarget=Retarget(input_data['retarget']).name, + member_id=input_data['memberId']) + + with Session(engine) as session: + session.add(capsule_skin) + session.commit() + + +@celery.task(base=LogErrorsTask) +def send_notification(_, input_data: dict, filename: str): + """ + 캡슐 스킨 생성 완료 알림 전송 태스크 + :param _: 이전 task 결과 + :param input_data: 입력 데이터(dict) - imageUrl, motionName, retarget, skinName, memberId, memberName + :param filename: 원격지에 저장될 파일 이름 ex) capsuleSkin/2/1234.gif + :return: + """ + request_data = json.dumps({ + 'memberId': input_data['memberId'], + 'skinName': input_data['skinName'], + 'title': '캡슐 스킨 생성이 완료되었습니다', + 'text': f"{input_data['skinName']}이 생성되었습니다. ARchive에서 확인해보세요!", + 'skinUrl': filename, + 'status': NotificationStatus.SUCCESS.value + }, ensure_ascii=False) + + with producers[connection].acquire(block=True) as producer: + exchange = Exchange(name=QueueConfig.NOTIFICATION_EXCHANGE_NAME, + type='direct', + durable=True) + + queue = Queue(name=QueueConfig.NOTIFICATION_QUEUE_NAME, + exchange=exchange, + routing_key=QueueConfig.NOTIFICATION_QUEUE_NAME) + + producer.publish( + request_data, + declare=[queue], + exchange=exchange, + content_type='application/json', + routing_key=QueueConfig.NOTIFICATION_QUEUE_NAME, + ) diff --git a/backend/AnimatedDrawings/examples/image_to_annotations.py b/backend/AnimatedDrawings/examples/image_to_annotations.py index 6c1a95278..ac46569c0 100644 --- a/backend/AnimatedDrawings/examples/image_to_annotations.py +++ b/backend/AnimatedDrawings/examples/image_to_annotations.py @@ -49,7 +49,7 @@ def image_to_annotations(img_bytes: bytes, outdir: Path) -> None: # convert to bytes and send to torchserve img_b = cv2.imencode('.png', img)[1].tobytes() request_data = {'data': img_b} - resp = requests.post("http://%s:8080/predictions/drawn_humanoid_detector" % config.torchserve_host, files=request_data, verify=False) + resp = requests.post(f'http://{TorchserveConfig.TORCHSERVE_HOST}:8080/predictions/drawn_humanoid_detector', files=request_data, verify=False) if resp is None or resp.status_code >= 300: raise Exception(f"Failed to get bounding box, please check if the 'docker_torchserve' is running and healthy, resp: {resp}") @@ -93,7 +93,7 @@ def image_to_annotations(img_bytes: bytes, outdir: Path) -> None: # send cropped image to pose estimator data_file = {'data': cv2.imencode('.png', cropped)[1].tobytes()} - resp = requests.post("http://%s:8080/predictions/drawn_humanoid_pose_estimator" % config.torchserve_host, files=data_file, verify=False) + resp = requests.post(f'http://{TorchserveConfig.TORCHSERVE_HOST}:8080/predictions/drawn_humanoid_pose_estimator', files=data_file, verify=False) if resp is None or resp.status_code >= 300: raise Exception(f"Failed to get skeletons, please check if the 'docker_torchserve' is running and healthy, resp: {resp}") diff --git a/backend/AnimatedDrawings/supervisord.conf b/backend/AnimatedDrawings/supervisord.conf index cf4da2daa..f4bdfd4a2 100644 --- a/backend/AnimatedDrawings/supervisord.conf +++ b/backend/AnimatedDrawings/supervisord.conf @@ -1,5 +1,6 @@ [supervisord] nodaemon=true +loglevel=info [inet_http_server] port = *:9001 @@ -8,14 +9,16 @@ password = 1234 [program:flower] directory = /app/application -command = /opt/conda/bin/conda run -n animated_drawings celery -A celery_app flower --conf="/app/application/config/flowerconfig.py" +command = /opt/conda/envs/animated_drawings/bin/celery -A celery_app flower --conf="/app/application/config/flowerconfig.py" priority = 100 +loglevel=info stdout_logfile = /var/log/flower.log stderr_logfile = /var/log/flower.err [program:application] directory = /app/application -command = /opt/conda/bin/conda run -n animated_drawings python3 animation_queue.py +command = /opt/conda/envs/animated_drawings/bin/python3 -u animation_queue.py +loglevel=info priority = 100 stdout_logfile = /var/log/application.log stderr_logfile = /var/log/application.err @@ -23,8 +26,9 @@ environment=ENVIRONMENT=%(ENV_ENVIRONMENT)s,PYOPENGL_PLATFORM="osmesa" [program:celery] directory = /app/application -command = /opt/conda/bin/conda run -n animated_drawings celery -A celery_app worker +command = /opt/conda/envs/animated_drawings/bin/celery -A celery_app worker priority = 200 +loglevel=info stdout_logfile = /var/log/celeryd.log stderr_logfile = /var/log/celeryd.err environment=ENVIRONMENT=%(ENV_ENVIRONMENT)s,PYOPENGL_PLATFORM="osmesa" diff --git a/backend/AnimatedDrawings/tests/test_animation_queue.py b/backend/AnimatedDrawings/tests/test_animation_queue.py index 937e89223..4f34921a6 100644 --- a/backend/AnimatedDrawings/tests/test_animation_queue.py +++ b/backend/AnimatedDrawings/tests/test_animation_queue.py @@ -29,7 +29,7 @@ def test_parse_json_valid(self): "motionName": Motion.DAB.value } - result = self.controller.parse_json(test_json) + result = self.controller.parse_body(test_json) self.assertEqual(result, expected_result) def test_parse_json_invalid_enum(self): @@ -43,7 +43,7 @@ def test_parse_json_invalid_enum(self): }).encode('utf-8') with self.assertRaises(KeyError): - self.controller.parse_json(test_json) + self.controller.parse_body(test_json) def test_parse_json_invalid_format(self): test_json = """{"memberId": 123, @@ -55,7 +55,7 @@ def test_parse_json_invalid_format(self): """.encode('utf-8') with self.assertRaises(json.JSONDecodeError): - self.controller.parse_json(test_json) + self.controller.parse_body(test_json) # 테스트 실행