From 8449c2692c8ed530e60697b9345ae9069520ce2e Mon Sep 17 00:00:00 2001 From: Dumitru Date: Fri, 29 Sep 2023 16:28:28 +0300 Subject: [PATCH 1/2] implement multiple publish channels --- dags/pipelines/notice_processor_pipelines.py | 35 ++++++++++++++++---- ted_sws/__init__.py | 10 +++++- 2 files changed, 38 insertions(+), 7 deletions(-) diff --git a/dags/pipelines/notice_processor_pipelines.py b/dags/pipelines/notice_processor_pipelines.py index a8d57ece..c1676089 100644 --- a/dags/pipelines/notice_processor_pipelines.py +++ b/dags/pipelines/notice_processor_pipelines.py @@ -113,17 +113,40 @@ def notice_publish_pipeline(notice: Notice, mongodb_client: MongoClient = None) from ted_sws.event_manager.services.log import log_notice_error from ted_sws import config notice.update_status_to(new_status=NoticeStatus.PACKAGED) - if config.S3_PUBLISH_ENABLED: - published_rdf_into_s3 = publish_notice_rdf_into_s3(notice=notice) + + unpublished_channels_count: int = 0 + if config.S3_PUBLISH_NOTICE_ENABLED: publish_notice_into_s3 = publish_notice_into_s3(notice=notice) - if not (published_rdf_into_s3 and publish_notice_into_s3): - log_notice_error(message="Can't load notice distilled rdf manifestation and METS package into S3 bucket!", + if not publish_notice_into_s3: + unpublished_channels_count += 1 + log_notice_error(message="Can't load METS package into S3 bucket!", + notice_id=notice.ted_id, notice_status=notice.status, + notice_form_number=notice.normalised_metadata.form_number, + notice_eforms_subtype=notice.normalised_metadata.eforms_subtype) + + + if config.S3_PUBLISH_NOTICE_RDF_ENABLED: + published_rdf_into_s3 = publish_notice_rdf_into_s3(notice=notice) + if not published_rdf_into_s3: + unpublished_channels_count += 1 + log_notice_error(message="Can't load notice distilled rdf manifestation into S3 bucket!", notice_id=notice.ted_id, notice_status=notice.status, notice_form_number=notice.normalised_metadata.form_number, notice_eforms_subtype=notice.normalised_metadata.eforms_subtype) + notice.set_is_eligible_for_publishing(eligibility=True) - result = publish_notice(notice=notice) - if result: + + if config.SFTP_PUBLISH_NOTICE_ENABLED: + sftp_publish_result = publish_notice(notice=notice) + if not sftp_publish_result: + unpublished_channels_count += 1 + log_notice_error(message="Can't load notice METS package into SFTP server!", + notice_id=notice.ted_id, notice_status=notice.status, + notice_form_number=notice.normalised_metadata.form_number, + notice_eforms_subtype=notice.normalised_metadata.eforms_subtype) + + + if unpublished_channels_count == 0: return NoticePipelineOutput(notice=notice) else: notice.set_is_eligible_for_publishing(eligibility=False) diff --git a/ted_sws/__init__.py b/ted_sws/__init__.py index 8fb802a2..05fbdd14 100644 --- a/ted_sws/__init__.py +++ b/ted_sws/__init__.py @@ -216,6 +216,10 @@ def SFTP_PRIVATE_KEY(self) -> str: sftp_private_key_base64 = base64.b64decode(str(sftp_private_key_base64)).decode(encoding="utf-8") return sftp_private_key_base64 + @env_property(config_resolver_class=AirflowAndEnvConfigResolver, default_value="false") + def SFTP_PUBLISH_NOTICE_ENABLED(self, config_value: str) -> bool: + return config_value.lower() in ["1", "true"] + class SPARQLConfig: @@ -258,7 +262,11 @@ def S3_PUBLISH_NOTICE_RDF_BUCKET(self, config_value: str) -> str: return config_value @env_property(config_resolver_class=AirflowAndEnvConfigResolver, default_value="false") - def S3_PUBLISH_ENABLED(self, config_value: str) -> bool: + def S3_PUBLISH_NOTICE_ENABLED(self, config_value: str) -> bool: + return config_value.lower() in ["1", "true"] + + @env_property(config_resolver_class=AirflowAndEnvConfigResolver, default_value="false") + def S3_PUBLISH_NOTICE_RDF_ENABLED(self, config_value: str) -> bool: return config_value.lower() in ["1", "true"] From ef84a41b9291da486db67c2aecbfb556a5027a07 Mon Sep 17 00:00:00 2001 From: Dumitru Date: Fri, 29 Sep 2023 17:25:31 +0300 Subject: [PATCH 2/2] change ENABLED config var with BUCKET and HOST --- dags/pipelines/notice_processor_pipelines.py | 6 +++--- ted_sws/__init__.py | 11 ----------- 2 files changed, 3 insertions(+), 14 deletions(-) diff --git a/dags/pipelines/notice_processor_pipelines.py b/dags/pipelines/notice_processor_pipelines.py index c1676089..325fef4e 100644 --- a/dags/pipelines/notice_processor_pipelines.py +++ b/dags/pipelines/notice_processor_pipelines.py @@ -115,7 +115,7 @@ def notice_publish_pipeline(notice: Notice, mongodb_client: MongoClient = None) notice.update_status_to(new_status=NoticeStatus.PACKAGED) unpublished_channels_count: int = 0 - if config.S3_PUBLISH_NOTICE_ENABLED: + if config.S3_PUBLISH_NOTICE_BUCKET: publish_notice_into_s3 = publish_notice_into_s3(notice=notice) if not publish_notice_into_s3: unpublished_channels_count += 1 @@ -125,7 +125,7 @@ def notice_publish_pipeline(notice: Notice, mongodb_client: MongoClient = None) notice_eforms_subtype=notice.normalised_metadata.eforms_subtype) - if config.S3_PUBLISH_NOTICE_RDF_ENABLED: + if config.S3_PUBLISH_NOTICE_RDF_BUCKET: published_rdf_into_s3 = publish_notice_rdf_into_s3(notice=notice) if not published_rdf_into_s3: unpublished_channels_count += 1 @@ -136,7 +136,7 @@ def notice_publish_pipeline(notice: Notice, mongodb_client: MongoClient = None) notice.set_is_eligible_for_publishing(eligibility=True) - if config.SFTP_PUBLISH_NOTICE_ENABLED: + if config.SFTP_PUBLISH_HOST: sftp_publish_result = publish_notice(notice=notice) if not sftp_publish_result: unpublished_channels_count += 1 diff --git a/ted_sws/__init__.py b/ted_sws/__init__.py index 05fbdd14..641d29b9 100644 --- a/ted_sws/__init__.py +++ b/ted_sws/__init__.py @@ -216,10 +216,6 @@ def SFTP_PRIVATE_KEY(self) -> str: sftp_private_key_base64 = base64.b64decode(str(sftp_private_key_base64)).decode(encoding="utf-8") return sftp_private_key_base64 - @env_property(config_resolver_class=AirflowAndEnvConfigResolver, default_value="false") - def SFTP_PUBLISH_NOTICE_ENABLED(self, config_value: str) -> bool: - return config_value.lower() in ["1", "true"] - class SPARQLConfig: @@ -261,13 +257,6 @@ def S3_PUBLISH_NOTICE_BUCKET(self, config_value: str) -> str: def S3_PUBLISH_NOTICE_RDF_BUCKET(self, config_value: str) -> str: return config_value - @env_property(config_resolver_class=AirflowAndEnvConfigResolver, default_value="false") - def S3_PUBLISH_NOTICE_ENABLED(self, config_value: str) -> bool: - return config_value.lower() in ["1", "true"] - - @env_property(config_resolver_class=AirflowAndEnvConfigResolver, default_value="false") - def S3_PUBLISH_NOTICE_RDF_ENABLED(self, config_value: str) -> bool: - return config_value.lower() in ["1", "true"] class AirflowConfig: