From 9c4bdda1776e851df638337f45230748974f3675 Mon Sep 17 00:00:00 2001 From: CaptainOfHacks <39195263+CaptainOfHacks@users.noreply.github.com> Date: Mon, 25 Sep 2023 22:02:03 +0300 Subject: [PATCH 1/8] Update event_message.py --- ted_sws/event_manager/model/event_message.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ted_sws/event_manager/model/event_message.py b/ted_sws/event_manager/model/event_message.py index 4fd990465..d754a1aa0 100644 --- a/ted_sws/event_manager/model/event_message.py +++ b/ted_sws/event_manager/model/event_message.py @@ -14,7 +14,7 @@ """ -class EventMessageProcessType(Enum): +class EventMessageProcessType(str, Enum): """ Event message process type. """ From 2c26e0d489f842d5501b154c6c0ed22ac4e32eb6 Mon Sep 17 00:00:00 2001 From: CaptainOfHacks <39195263+CaptainOfHacks@users.noreply.github.com> Date: Mon, 25 Sep 2023 22:07:20 +0300 Subject: [PATCH 2/8] Update event_message.py --- ted_sws/event_manager/model/event_message.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ted_sws/event_manager/model/event_message.py b/ted_sws/event_manager/model/event_message.py index d754a1aa0..77f5bd97b 100644 --- a/ted_sws/event_manager/model/event_message.py +++ b/ted_sws/event_manager/model/event_message.py @@ -5,6 +5,7 @@ from pydantic import Field from ted_sws.core.model import PropertyBaseModel, BaseModel +from ted_sws.core.model.notice import NoticeStatus from ted_sws.event_manager.adapters.log import SeverityLevelType DictType = Union[Dict[str, Any], None] @@ -111,7 +112,7 @@ class NoticeEventMessage(EventMessage): domain_action: Optional[str] = None notice_form_number: Optional[str] = None notice_eforms_subtype: Optional[str] = None - notice_status: Optional[str] = None + notice_status: Optional[NoticeStatus] = None class MappingSuiteEventMessage(EventMessage): From e0f85c718232d58e5e6ce72c51ea408e621ade0b Mon Sep 17 00:00:00 2001 From: CaptainOfHacks <39195263+CaptainOfHacks@users.noreply.github.com> Date: Mon, 25 Sep 2023 22:19:10 +0300 Subject: [PATCH 3/8] Update DagBatchPipelineOperator.py --- dags/operators/DagBatchPipelineOperator.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/dags/operators/DagBatchPipelineOperator.py b/dags/operators/DagBatchPipelineOperator.py index 3d2120940..cab07c581 100644 --- a/dags/operators/DagBatchPipelineOperator.py +++ b/dags/operators/DagBatchPipelineOperator.py @@ -70,15 +70,16 @@ def single_notice_processor(self, notice_id: str, notice_repository: NoticeRepos result_notice_pipeline = self.notice_pipeline_callable(notice, notice_repository.mongodb_client) if result_notice_pipeline.store_result: notice_repository.update(notice=result_notice_pipeline.notice) + notice_event.end_record() if result_notice_pipeline.processed: processed_notice_id = notice_id - notice_event.end_record() - if notice.normalised_metadata: - notice_event.notice_form_number = notice.normalised_metadata.form_number - notice_event.notice_eforms_subtype = notice.normalised_metadata.eforms_subtype - notice_event.notice_status = str(notice.status) - logger.info(event_message=notice_event) + if notice.normalised_metadata: + notice_event.notice_form_number = notice.normalised_metadata.form_number + notice_event.notice_eforms_subtype = notice.normalised_metadata.eforms_subtype + notice_event.notice_status = str(notice.status) + logger.info(event_message=notice_event) error_message = result_notice_pipeline.error_message + print("ERROR MESSAGE: ", error_message) except Exception as exception_error_message: error_message = str(exception_error_message) if error_message: From b6b10b665bf75356f05ee5f517a39018ea2ab12c Mon Sep 17 00:00:00 2001 From: CaptainOfHacks <39195263+CaptainOfHacks@users.noreply.github.com> Date: Mon, 25 Sep 2023 22:28:26 +0300 Subject: [PATCH 4/8] wip --- dags/operators/DagBatchPipelineOperator.py | 2 +- dags/pipelines/notice_processor_pipelines.py | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/dags/operators/DagBatchPipelineOperator.py b/dags/operators/DagBatchPipelineOperator.py index cab07c581..f8780118d 100644 --- a/dags/operators/DagBatchPipelineOperator.py +++ b/dags/operators/DagBatchPipelineOperator.py @@ -82,7 +82,7 @@ def single_notice_processor(self, notice_id: str, notice_repository: NoticeRepos print("ERROR MESSAGE: ", error_message) except Exception as exception_error_message: error_message = str(exception_error_message) - if error_message: + if error_message is not None: notice_normalised_metadata = notice.normalised_metadata if notice else None log_notice_error(message=error_message, notice_id=notice_id, domain_action=pipeline_name, notice_form_number=notice_normalised_metadata.form_number if notice_normalised_metadata else None, diff --git a/dags/pipelines/notice_processor_pipelines.py b/dags/pipelines/notice_processor_pipelines.py index a8d57ece2..be8d0230e 100644 --- a/dags/pipelines/notice_processor_pipelines.py +++ b/dags/pipelines/notice_processor_pipelines.py @@ -17,8 +17,11 @@ def notice_normalisation_pipeline(notice: Notice, mongodb_client: MongoClient = indexed_notice = index_notice(notice=notice) try: normalised_notice = normalise_notice(notice=indexed_notice) + normalised_notice.update_status_to(new_status=NoticeStatus.NORMALISED_METADATA) + print("Notice normalisation pipeline succes: ", normalised_notice.ted_id) return NoticePipelineOutput(notice=normalised_notice) except Exception as error_message: + print("Notice normalisation pipeline error: ", normalised_notice.ted_id) return NoticePipelineOutput(notice=indexed_notice, processed=False, store_result=True, error_message=str(error_message)) From e7a3c64f8b50ac2812a681d1f721ef02ec0e5db7 Mon Sep 17 00:00:00 2001 From: CaptainOfHacks <39195263+CaptainOfHacks@users.noreply.github.com> Date: Mon, 25 Sep 2023 22:39:21 +0300 Subject: [PATCH 5/8] wip --- dags/operators/DagBatchPipelineOperator.py | 1 - dags/pipelines/notice_processor_pipelines.py | 2 -- ted_sws/core/model/notice.py | 5 +++++ 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/dags/operators/DagBatchPipelineOperator.py b/dags/operators/DagBatchPipelineOperator.py index f8780118d..f097d8e7c 100644 --- a/dags/operators/DagBatchPipelineOperator.py +++ b/dags/operators/DagBatchPipelineOperator.py @@ -79,7 +79,6 @@ def single_notice_processor(self, notice_id: str, notice_repository: NoticeRepos notice_event.notice_status = str(notice.status) logger.info(event_message=notice_event) error_message = result_notice_pipeline.error_message - print("ERROR MESSAGE: ", error_message) except Exception as exception_error_message: error_message = str(exception_error_message) if error_message is not None: diff --git a/dags/pipelines/notice_processor_pipelines.py b/dags/pipelines/notice_processor_pipelines.py index be8d0230e..78ad5b20e 100644 --- a/dags/pipelines/notice_processor_pipelines.py +++ b/dags/pipelines/notice_processor_pipelines.py @@ -18,10 +18,8 @@ def notice_normalisation_pipeline(notice: Notice, mongodb_client: MongoClient = try: normalised_notice = normalise_notice(notice=indexed_notice) normalised_notice.update_status_to(new_status=NoticeStatus.NORMALISED_METADATA) - print("Notice normalisation pipeline succes: ", normalised_notice.ted_id) return NoticePipelineOutput(notice=normalised_notice) except Exception as error_message: - print("Notice normalisation pipeline error: ", normalised_notice.ted_id) return NoticePipelineOutput(notice=indexed_notice, processed=False, store_result=True, error_message=str(error_message)) diff --git a/ted_sws/core/model/notice.py b/ted_sws/core/model/notice.py index 2a85a9203..edd1f5e52 100644 --- a/ted_sws/core/model/notice.py +++ b/ted_sws/core/model/notice.py @@ -481,6 +481,9 @@ def update_status_to(self, new_status: NoticeStatus): if type(new_status) is not NoticeStatus: raise ValueError(f"Status must be a NoticeStatus") + print("Current status is :", self._status) + print("New status is :", new_status) + if self._status < new_status: if new_status in NOTICE_STATUS_DOWNSTREAM_TRANSITION[self._status]: self._status = new_status @@ -507,3 +510,5 @@ def update_status_to(self, new_status: NoticeStatus): if new_status < NoticeStatus.PACKAGED: self.remove_lazy_field(Notice.mets_manifestation) self._mets_manifestation = None + + print("Updated status is :", self._status) From 996306aee7435917f1ac0c13fae9318e331dde84 Mon Sep 17 00:00:00 2001 From: CaptainOfHacks <39195263+CaptainOfHacks@users.noreply.github.com> Date: Mon, 25 Sep 2023 23:01:32 +0300 Subject: [PATCH 6/8] WIP --- ted_sws/core/model/notice.py | 7 +------ ted_sws/data_manager/adapters/manifestation_repository.py | 5 +++-- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/ted_sws/core/model/notice.py b/ted_sws/core/model/notice.py index edd1f5e52..ffa37aa80 100644 --- a/ted_sws/core/model/notice.py +++ b/ted_sws/core/model/notice.py @@ -479,10 +479,7 @@ def update_status_to(self, new_status: NoticeStatus): :return: """ if type(new_status) is not NoticeStatus: - raise ValueError(f"Status must be a NoticeStatus") - - print("Current status is :", self._status) - print("New status is :", new_status) + raise ValueError("Status must be a NoticeStatus") if self._status < new_status: if new_status in NOTICE_STATUS_DOWNSTREAM_TRANSITION[self._status]: @@ -510,5 +507,3 @@ def update_status_to(self, new_status: NoticeStatus): if new_status < NoticeStatus.PACKAGED: self.remove_lazy_field(Notice.mets_manifestation) self._mets_manifestation = None - - print("Updated status is :", self._status) diff --git a/ted_sws/data_manager/adapters/manifestation_repository.py b/ted_sws/data_manager/adapters/manifestation_repository.py index 633e62c2b..e437d095d 100644 --- a/ted_sws/data_manager/adapters/manifestation_repository.py +++ b/ted_sws/data_manager/adapters/manifestation_repository.py @@ -10,7 +10,8 @@ from ted_sws.data_manager.adapters.repository_abc import ManifestationRepositoryABC MONGODB_COLLECTION_ID = "_id" -FILE_STORAGE_COLLECTION_NAME = "fs.files" +MANIFESTATION_GRID_FS_COLLECTION_NAME = "manifestations_fs" +FILE_STORAGE_COLLECTION_NAME = f"{MANIFESTATION_GRID_FS_COLLECTION_NAME}.files" MANIFESTATION_ID = "manifestation_id" OBJECT_DATA_KEY = "object_data" AGGREGATE_REFERENCE_ID = "ted_id" @@ -26,7 +27,7 @@ def __init__(self, mongodb_client: MongoClient, database_name: str = None): self._database_name = database_name self.mongodb_client = mongodb_client db = mongodb_client[self._database_name] - self.file_storage = gridfs.GridFS(db) # TODO: Investigate how it works in multiple processes in parallel. + self.file_storage = gridfs.GridFS(db, collection=MANIFESTATION_GRID_FS_COLLECTION_NAME) # TODO: Investigate how it works in multiple processes in parallel. self.collection = db[self._collection_name] self.collection.create_index([(AGGREGATE_REFERENCE_ID, ASCENDING)]) self.file_storage_collection = db[FILE_STORAGE_COLLECTION_NAME] From fd505a44f75dacb419dd711d7fb65e707e436e8d Mon Sep 17 00:00:00 2001 From: CaptainOfHacks <39195263+CaptainOfHacks@users.noreply.github.com> Date: Mon, 25 Sep 2023 23:08:42 +0300 Subject: [PATCH 7/8] Update notice_processor_pipelines.py --- dags/pipelines/notice_processor_pipelines.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dags/pipelines/notice_processor_pipelines.py b/dags/pipelines/notice_processor_pipelines.py index 78ad5b20e..a8d57ece2 100644 --- a/dags/pipelines/notice_processor_pipelines.py +++ b/dags/pipelines/notice_processor_pipelines.py @@ -17,7 +17,6 @@ def notice_normalisation_pipeline(notice: Notice, mongodb_client: MongoClient = indexed_notice = index_notice(notice=notice) try: normalised_notice = normalise_notice(notice=indexed_notice) - normalised_notice.update_status_to(new_status=NoticeStatus.NORMALISED_METADATA) return NoticePipelineOutput(notice=normalised_notice) except Exception as error_message: return NoticePipelineOutput(notice=indexed_notice, processed=False, From f4f848735dbdcc2435d24928827678d4c16c1e06 Mon Sep 17 00:00:00 2001 From: CaptainOfHacks <39195263+CaptainOfHacks@users.noreply.github.com> Date: Mon, 25 Sep 2023 23:09:27 +0300 Subject: [PATCH 8/8] Update DagBatchPipelineOperator.py --- dags/operators/DagBatchPipelineOperator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dags/operators/DagBatchPipelineOperator.py b/dags/operators/DagBatchPipelineOperator.py index f097d8e7c..77fb17a12 100644 --- a/dags/operators/DagBatchPipelineOperator.py +++ b/dags/operators/DagBatchPipelineOperator.py @@ -81,7 +81,7 @@ def single_notice_processor(self, notice_id: str, notice_repository: NoticeRepos error_message = result_notice_pipeline.error_message except Exception as exception_error_message: error_message = str(exception_error_message) - if error_message is not None: + if error_message: notice_normalised_metadata = notice.normalised_metadata if notice else None log_notice_error(message=error_message, notice_id=notice_id, domain_action=pipeline_name, notice_form_number=notice_normalised_metadata.form_number if notice_normalised_metadata else None,