diff --git a/src/scripts/backfill_jobs/backfill_old_dataware_house_file_download_records.py b/src/scripts/backfill_jobs/backfill_old_dataware_house_file_download_records.py index b8395f7..d33d813 100644 --- a/src/scripts/backfill_jobs/backfill_old_dataware_house_file_download_records.py +++ b/src/scripts/backfill_jobs/backfill_old_dataware_house_file_download_records.py @@ -8,6 +8,7 @@ import gs_explode import json from backfill_utils import * +import re args = getResolvedOptions(sys.argv, ["JOB_NAME", "DESTINATION_DATABASE_NAME", "SOURCE_DATABASE_NAME","DESTINATION_TABLE_NAME", "SOURCE_BULK_TABLE_NAME", "SOURCE_FILE_TABLE_NAME", "STACK", @@ -29,7 +30,8 @@ def transform_bulk_download(dynamic_record): file_summary = { "file_handle_id": get_key_from_json_payload(file, "fileHandleId"), "association_object_id": get_key_from_json_payload(file, "associateObjectId"), - "association_object_type": get_key_from_json_payload(file, "associateObjectType") + "association_object_type": get_key_from_json_payload(file, "associateObjectType"), + "status": get_key_from_json_payload(file, "status") } file_info.append(file_summary) dynamic_record["payloads"] = file_info @@ -119,10 +121,11 @@ def add_common_fields(json_payload, dynamic_record): exploded_frame = bulk_mapped_frame.gs_explode( colName="payloads", newCol="payload" ) - +filtered_frame = exploded_frame.filter( + f=lambda row: (bool(re.match("SUCCESS", row["payload"]["status"])))) bulk_final_frame = ApplyMapping.apply( - frame=exploded_frame, + frame=filtered_frame, mappings=[ ("timestamp", "bigint", "timestamp", "timestamp"), ("stack", "string", "stack", "string"),