Skip to content

Commit

Permalink
Update DagBatchPipelineOperator.py
Browse files Browse the repository at this point in the history
  • Loading branch information
CaptainOfHacks committed Oct 25, 2023
1 parent f0d1751 commit d8fa9a6
Showing 1 changed file with 6 additions and 4 deletions.
10 changes: 6 additions & 4 deletions dags/operators/DagBatchPipelineOperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,16 @@ def multithread_notice_processor(notice_id: str):
notice_event.notice_eforms_subtype = notice.normalised_metadata.eforms_subtype
notice_event.notice_status = str(notice.status)
logger.info(event_message=notice_event)

error_message = result_notice_pipeline.error_message
except Exception as e:
error_message = str(e)
notice_normalised_metadata = notice.normalised_metadata if notice else None
log_notice_error(message=error_message, notice_id=notice_id, domain_action=pipeline_name,
notice_form_number=notice_normalised_metadata.form_number if notice_normalised_metadata else None,
notice_status=notice.status if notice else None,
notice_eforms_subtype=notice_normalised_metadata.eforms_subtype if notice_normalised_metadata else None)
if error_message:
log_notice_error(message=error_message, notice_id=notice_id, domain_action=pipeline_name,
notice_form_number=notice_normalised_metadata.form_number if notice_normalised_metadata else None,
notice_status=notice.status if notice else None,
notice_eforms_subtype=notice_normalised_metadata.eforms_subtype if notice_normalised_metadata else None)

with ThreadPoolExecutor() as executor:
futures = [executor.submit(multithread_notice_processor, notice_id) for notice_id in
Expand Down

0 comments on commit d8fa9a6

Please sign in to comment.