Skip to content

Commit

Permalink
Merge pull request #385 from tukcomCD2024/develop_back_animation
Browse files Browse the repository at this point in the history
feat : develop_back_animation -> develop_back
  • Loading branch information
seokho-1116 authored May 2, 2024
2 parents f3e965f + bd467ab commit e0e5ff6
Show file tree
Hide file tree
Showing 24 changed files with 415 additions and 297 deletions.
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@
[submodule "backend/notification/src/main/resources/config"]
path = backend/notification/src/main/resources/config
url = https://github.com/seokho-1116/DroidBlossom-config-notification.git
[submodule "backend/AnimatedDrawings/application/config/yml"]
path = backend/AnimatedDrawings/application/config/yml
url = https://github.com/seokho-1116/DroidBlosson-config-animation.git
2 changes: 2 additions & 0 deletions backend/AnimatedDrawings/application/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
import celery_app
__all__ = ('celery_app',)
143 changes: 69 additions & 74 deletions backend/AnimatedDrawings/application/animation_queue.py
Original file line number Diff line number Diff line change
@@ -1,122 +1,117 @@
import json
import logging
import uuid
from _xxsubinterpreters import ChannelClosedError
from json.decoder import JSONDecodeError
from ast import literal_eval

import pika
from celery import chain
from pika.adapters.blocking_connection import BlockingChannel
from pika.spec import Basic, BasicProperties
from kombu import Exchange, Queue

from application.config.queue_config import QueueConfig
from application.logging.logger_factory import LoggerFactory
from application.model.motion import Motion
from application.model.retarget import Retarget
from application.task.make_animation import MakeAnimation
from application.task.save_capsule_skin import SaveCapsuleSkin
from application.task.send_notification import SendNotification

logger = logging.getLogger('animation_queue_controller')
from application.task.tasks import create_animation, save_capsule_skin, \
send_notification
from kombu_connection_pool import connection, connections


class AnimationQueueController:
def __init__(self):
self.queue_config = QueueConfig()
self.require_keys = ['memberId', 'memberName', 'skinName', 'imageUrl',
'retarget', 'motionName']
self.celery_work_queue_name = 'makeAnimation.queue'
self.celery_success_queue_name = 'saveCapsuleSkin.queue'
self.celery_send_notification_queue_name = 'sendNotification.queue'
self.make_animation_task = MakeAnimation()
self.save_capsule_skin_task = SaveCapsuleSkin()
self.send_notification_task = SendNotification()
self.celery_work_queue_name = 'task.makeAnimation.queue'
self.celery_success_queue_name = 'task.saveCapsuleSkin.queue'
self.celery_send_notification_queue_name = 'task.sendNotification.queue'
self.logger = LoggerFactory.get_logger(__name__)

def run(self):
# rabbitmq 채널 연결
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=self.queue_config.queue_host))
channel = connection.channel()

channel.queue_declare(queue=self.queue_config.queue_name, durable=True)

channel.basic_consume(queue=self.queue_config.queue_name,
on_message_callback=self.callback,
auto_ack=False)

try:
channel.start_consuming()
except ChannelClosedError as e:
logger.info("커넥션 연결 오류")
raise e
finally:
channel.close()
"""
큐 메시지 리스닝
:return:
"""
capsule_skin_exchange = Exchange(
name=QueueConfig.CAPSULE_SKIN_REQUEST_EXCHANGE_NAME,
type='direct',
durable=True)
capsule_skin_queue = Queue(
name=QueueConfig.CAPSULE_SKIN_REQUEST_QUEUE_NAME,
exchange=capsule_skin_exchange,
routing_key=QueueConfig.CAPSULE_SKIN_REQUEST_QUEUE_NAME)

with connections[connection].acquire(block=True) as conn:
with conn.Consumer(queues=[capsule_skin_queue],
callbacks=[self.callback],
accept=['json']):
self.logger.info('메시지 수신 시작')
while True:
conn.drain_events()

def callback(
self,
channel: BlockingChannel,
method: Basic.Deliver,
header: BasicProperties,
body: bytes,
body: [dict, str, bytes],
message: any
) -> None:
"""
queue에서 message consume 시 동작하는 callback
celery worker한테 animation 생성 작업을 실행한다
:param channel: 큐와 연결된 채널
:param method: 메시지의 상태
:param header: 기본 정보
:param body: queue로부터 넘어온 데이터
큐에 메시지가 도착했을 때 동작하는 콜백 함수
:param body:
:param message:
:return:
"""
logger.info('큐 메시지 처리 시작 %s', header.message_id)
try:
json_object = self.parse_json(body)
self.logger.debug('메시지 수신 완료, 콜백 동작')
parsed_data = self.parse_body(body)

filename = f"capsuleSkin/{json_object['memberId']}/{uuid.uuid4()}.gif"
filename = f"capsuleSkin/{parsed_data['memberId']}/{uuid.uuid4()}.gif"

chain(
self.make_animation_task.s(input_data=json_object,
filename=filename)
create_animation.s(input_data=parsed_data,
filename=filename)
.set(queue=self.celery_work_queue_name),

self.save_capsule_skin_task.s(input_data=json_object,
filename=filename)
save_capsule_skin.s(input_data=parsed_data,
filename=filename)
.set(queue=self.celery_success_queue_name),

self.send_notification_task.s(input_data=json_object,
filename=filename)
send_notification.s(input_data=parsed_data,
filename=filename)
.set(queue=self.celery_send_notification_queue_name)
).apply_async(
ignore_result=True
)

channel.basic_ack(delivery_tag=method.delivery_tag)
message.ack()
self.logger.debug('celery에 작업 전달 완료')
except Exception as e:
logger.exception('메시지 처리 오류', e)
channel.basic_reject(delivery_tag=method.delivery_tag,
requeue=False)
self.logger.exception('작업 큐 메시지 처리 오류 %r', e)
message.reject()

def parse_json(self, body: bytes):
def parse_body(self, body: [dict, str, bytes]) -> dict:
"""
json bytes를 파싱해 dict로 반환하는 함수
:param body: queue로부터 넘어온 json bytes
:return: body에서 파싱된 dict
:raises JSONDecodeError: 유효하지 않은 json 형태인 경우
:raise TypeError: 잘못된 json 입력 타입인 경우
큐로부터 온 메시지 파싱 함수
:param body: 큐에서 온 메시지
:return: 파싱된 메시지 dict
"""
try:
json_object = json.loads(body.decode(encoding='utf8'))
json_object['memberId'] = str(json_object['memberId'])
json_object['retarget'] = Retarget[json_object['retarget']].value
json_object['motionName'] = Motion[json_object['motionName']].value
if isinstance(body, str):
dict_data = literal_eval(body)
elif isinstance(body, dict):
dict_data = body
elif isinstance(body, bytes):
dict_data = literal_eval(body.decode('utf-8'))
else:
self.logger.error('처리할 수 없는 타입 오류')
raise TypeError('처리할 수 없는 타입입니다')

dict_data['retarget'] = Retarget[dict_data['retarget']].value
dict_data['motionName'] = Motion[dict_data['motionName']].value

for key in self.require_keys:
if key not in json_object:
if key not in dict_data:
raise KeyError

return json_object
return dict_data

except (JSONDecodeError, KeyError, TypeError) as e:
logger.exception('json 파싱 오류', e)
except (KeyError, TypeError) as e:
self.logger.exception('작업 큐 메시지 파싱 오류 %r', e)
raise e


Expand Down
19 changes: 6 additions & 13 deletions backend/AnimatedDrawings/application/celery_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,15 @@
from kombu import Queue

from application.config.queue_config import QueueConfig
from application.task.make_animation import MakeAnimation
from application.task.save_capsule_skin import SaveCapsuleSkin
from application.task.send_notification import SendNotification

queue_config = QueueConfig()
celery = Celery('application',
broker=queue_config.get_queue_url(),
include=['application.task'])
celery = Celery('application.task',
broker=queue_config.get_celery_broker_url(),
include=['application.task.tasks'])

celery.conf.result_expires = 300
celery.conf.task_queues = (
Queue('makeAnimation.queue'),
Queue('saveCapsuleSkin.queue'),
Queue('sendNotification.queue')
Queue('task.makeAnimation.queue'),
Queue('task.saveCapsuleSkin.queue'),
Queue('task.sendNotification.queue')
)

celery.register_task(MakeAnimation())
celery.register_task(SaveCapsuleSkin())
celery.register_task(SendNotification())
29 changes: 16 additions & 13 deletions backend/AnimatedDrawings/application/config/database_config.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
from urllib.parse import urlparse, quote_plus
from sqlalchemy import URL

from application.config.root_config import RootConfig


class DatabaseConfig(RootConfig):
def __init__(self):
self._db_username = self._config_file['spring']['datasource'][
'username']
self._db_password = self._config_file['spring']['datasource'][
'password']
self._db_url = urlparse(
self._config_file['spring']['datasource']['url'])
class DatabaseConfig:
USER_NAME = RootConfig.CONFIG_FILE['database']['username']
PASSWORD = RootConfig.CONFIG_FILE['database']['password']
DB_URL = RootConfig.CONFIG_FILE['database']['url']
DB_NAME = RootConfig.CONFIG_FILE['database']['database_name']

def get_database_url(self) -> str:
url = self._db_url.path.split("//")[1]
return 'mysql+pymysql://%s:%s@%s' % (
self._db_username, quote_plus(self._db_password), url)
@staticmethod
def get_database_url() -> URL:
url_object = URL.create(
"mysql+pymysql",
username=DatabaseConfig.USER_NAME,
password=DatabaseConfig.PASSWORD,
host=DatabaseConfig.DB_URL,
database=DatabaseConfig.DB_NAME
)
return url_object
18 changes: 4 additions & 14 deletions backend/AnimatedDrawings/application/config/flowerconfig.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,9 @@
from application.config.root_config import RootConfig


class FlowerConfig(RootConfig):
def __init__(self):
self._username = self._config_file['flower']['username']
self._password = self._config_file['flower']['password']
class FlowerConfig:
USERNAME = RootConfig.CONFIG_FILE['flower']['username']
PASSWORD = RootConfig.CONFIG_FILE['flower']['password']

@property
def username(self):
return self._username

@property
def password(self):
return self._password


config = FlowerConfig()
basic_auth = "%s:%s" % (config.username, config.password)
basic_auth = f'{FlowerConfig.USERNAME}:{FlowerConfig.PASSWORD}'
10 changes: 10 additions & 0 deletions backend/AnimatedDrawings/application/config/logger_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from application.config.root_config import RootConfig


class LoggerConfig:
LOGGING_LEVEL = RootConfig.CONFIG_FILE['logging']['level']
CELERY_OUTPUT_FILE_PATH = RootConfig.CONFIG_FILE['logging'][
'celery_output_file_path']
APPLICATION_OUTPUT_FILE_PATH = RootConfig.CONFIG_FILE['logging'][
'application_output_file_path']
FORMAT_STRING = RootConfig.CONFIG_FILE['logging']['format']
41 changes: 22 additions & 19 deletions backend/AnimatedDrawings/application/config/queue_config.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
from application.config.root_config import RootConfig


class QueueConfig(RootConfig):
def __init__(self):
self._queue_username = self._config_file['spring']['rabbitmq']['username']
self._queue_host = self._config_file['spring']['rabbitmq']['host']
self._queue_password = self._config_file['spring']['rabbitmq']['password']
self._queue_virtual_host = self._config_file['spring']['rabbitmq']['virtual-host']
self._queue_name = 'capsuleSkin_delay.queue'
class QueueConfig:
USERNAME = RootConfig.CONFIG_FILE['rabbitmq']['username']
BROKER_HOST = RootConfig.CONFIG_FILE['rabbitmq']['host']
PASSWORD = RootConfig.CONFIG_FILE['rabbitmq']['password']
VIRTUAL_HOST = RootConfig.CONFIG_FILE['rabbitmq']['virtual-host']
CAPSULE_SKIN_REQUEST_QUEUE_NAME = RootConfig.CONFIG_FILE['rabbitmq'][
'queue_name']
CAPSULE_SKIN_REQUEST_EXCHANGE_NAME = RootConfig.CONFIG_FILE['rabbitmq'][
'exchange_name']
NOTIFICATION_EXCHANGE_NAME = RootConfig.CONFIG_FILE['rabbitmq'][
'notification_exchange_name']
NOTIFICATION_QUEUE_NAME = RootConfig.CONFIG_FILE['rabbitmq'][
'notification_queue_name']

def get_queue_url(self) -> str:
return 'pyamqp://%s:%s@%s:5672%s' % (self._queue_username,
self._queue_password,
self._queue_host,
self._queue_virtual_host)
@staticmethod
def get_celery_broker_url() -> str:
return 'pyamqp://%s:%s@%s:5672%s' % (QueueConfig.USERNAME,
QueueConfig.PASSWORD,
QueueConfig.BROKER_HOST,
QueueConfig.VIRTUAL_HOST)

@property
def queue_name(self) -> str:
return self._queue_name

@property
def queue_host(self) -> str:
return self._queue_host
@staticmethod
def get_kombu_broker_url() -> str:
return f'amqp://{QueueConfig.USERNAME}:{QueueConfig.PASSWORD}@{QueueConfig.BROKER_HOST}/{QueueConfig.VIRTUAL_HOST}'
9 changes: 6 additions & 3 deletions backend/AnimatedDrawings/application/config/root_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
import yaml

environment = os.environ.get('ENVIRONMENT', 'local')
APPLICATION_YAML_PATH = f'config/yaml/application-{environment}.yml'
APPLICATION_YAML_PATH = f'config/yml/config-{environment}.yml'


class RootConfig:
with open(APPLICATION_YAML_PATH) as file:
_config_file = yaml.safe_load(file)
CONFIG_FILE = None

if CONFIG_FILE is None:
with open(APPLICATION_YAML_PATH) as file:
CONFIG_FILE = yaml.safe_load(file)
19 changes: 3 additions & 16 deletions backend/AnimatedDrawings/application/config/s3_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,6 @@


class S3Config(RootConfig):
def __init__(self):
self._access_key_id = self._config_file['s3']['accessKey']
self._secret_access_key = self._config_file['s3']['secretKey']
self._s3_bucket_name = self._config_file['s3']['bucket']

@property
def access_key(self) -> str:
return self._access_key_id

@property
def secret_key(self) -> str:
return self._secret_access_key

@property
def s3_bucket_name(self) -> str:
return self._s3_bucket_name
ACCESS_KEY_ID = RootConfig.CONFIG_FILE['s3']['accessKey']
SECRET_ACCESS_KEY = RootConfig.CONFIG_FILE['s3']['secretKey']
S3_BUCKET_NAME = RootConfig.CONFIG_FILE['s3']['bucket']
Loading

0 comments on commit e0e5ff6

Please sign in to comment.