Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/ted4 55 #497

Merged
merged 8 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions dags/operators/DagBatchPipelineOperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,14 @@ def single_notice_processor(self, notice_id: str, notice_repository: NoticeRepos
result_notice_pipeline = self.notice_pipeline_callable(notice, notice_repository.mongodb_client)
if result_notice_pipeline.store_result:
notice_repository.update(notice=result_notice_pipeline.notice)
notice_event.end_record()
if result_notice_pipeline.processed:
processed_notice_id = notice_id
notice_event.end_record()
if notice.normalised_metadata:
notice_event.notice_form_number = notice.normalised_metadata.form_number
notice_event.notice_eforms_subtype = notice.normalised_metadata.eforms_subtype
notice_event.notice_status = str(notice.status)
logger.info(event_message=notice_event)
if notice.normalised_metadata:
notice_event.notice_form_number = notice.normalised_metadata.form_number
notice_event.notice_eforms_subtype = notice.normalised_metadata.eforms_subtype
notice_event.notice_status = str(notice.status)
logger.info(event_message=notice_event)
error_message = result_notice_pipeline.error_message
except Exception as exception_error_message:
error_message = str(exception_error_message)
Expand Down
2 changes: 1 addition & 1 deletion ted_sws/core/model/notice.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ def update_status_to(self, new_status: NoticeStatus):
:return:
"""
if type(new_status) is not NoticeStatus:
raise ValueError(f"Status must be a NoticeStatus")
raise ValueError("Status must be a NoticeStatus")

if self._status < new_status:
if new_status in NOTICE_STATUS_DOWNSTREAM_TRANSITION[self._status]:
Expand Down
5 changes: 3 additions & 2 deletions ted_sws/data_manager/adapters/manifestation_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
from ted_sws.data_manager.adapters.repository_abc import ManifestationRepositoryABC

MONGODB_COLLECTION_ID = "_id"
FILE_STORAGE_COLLECTION_NAME = "fs.files"
MANIFESTATION_GRID_FS_COLLECTION_NAME = "manifestations_fs"
FILE_STORAGE_COLLECTION_NAME = f"{MANIFESTATION_GRID_FS_COLLECTION_NAME}.files"
MANIFESTATION_ID = "manifestation_id"
OBJECT_DATA_KEY = "object_data"
AGGREGATE_REFERENCE_ID = "ted_id"
Expand All @@ -26,7 +27,7 @@ def __init__(self, mongodb_client: MongoClient, database_name: str = None):
self._database_name = database_name
self.mongodb_client = mongodb_client
db = mongodb_client[self._database_name]
self.file_storage = gridfs.GridFS(db) # TODO: Investigate how it works in multiple processes in parallel.
self.file_storage = gridfs.GridFS(db, collection=MANIFESTATION_GRID_FS_COLLECTION_NAME) # TODO: Investigate how it works in multiple processes in parallel.
self.collection = db[self._collection_name]
self.collection.create_index([(AGGREGATE_REFERENCE_ID, ASCENDING)])
self.file_storage_collection = db[FILE_STORAGE_COLLECTION_NAME]
Expand Down
5 changes: 3 additions & 2 deletions ted_sws/event_manager/model/event_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pydantic import Field

from ted_sws.core.model import PropertyBaseModel, BaseModel
from ted_sws.core.model.notice import NoticeStatus
from ted_sws.event_manager.adapters.log import SeverityLevelType

DictType = Union[Dict[str, Any], None]
Expand All @@ -14,7 +15,7 @@
"""


class EventMessageProcessType(Enum):
class EventMessageProcessType(str, Enum):
"""
Event message process type.
"""
Expand Down Expand Up @@ -111,7 +112,7 @@ class NoticeEventMessage(EventMessage):
domain_action: Optional[str] = None
notice_form_number: Optional[str] = None
notice_eforms_subtype: Optional[str] = None
notice_status: Optional[str] = None
notice_status: Optional[NoticeStatus] = None


class MappingSuiteEventMessage(EventMessage):
Expand Down