Skip to content

Commit

Permalink
Merge pull request #41 from SandhraSokhal/PLFM-7926.1
Browse files Browse the repository at this point in the history
add filter to only process success file download records
  • Loading branch information
xschildw authored Sep 29, 2023
2 parents 2c44f97 + 28e00d4 commit 7bef7ec
Showing 1 changed file with 6 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import gs_explode
import json
from backfill_utils import *
import re

args = getResolvedOptions(sys.argv,
["JOB_NAME", "DESTINATION_DATABASE_NAME", "SOURCE_DATABASE_NAME","DESTINATION_TABLE_NAME", "SOURCE_BULK_TABLE_NAME", "SOURCE_FILE_TABLE_NAME", "STACK",
Expand All @@ -29,7 +30,8 @@ def transform_bulk_download(dynamic_record):
file_summary = {
"file_handle_id": get_key_from_json_payload(file, "fileHandleId"),
"association_object_id": get_key_from_json_payload(file, "associateObjectId"),
"association_object_type": get_key_from_json_payload(file, "associateObjectType")
"association_object_type": get_key_from_json_payload(file, "associateObjectType"),
"status": get_key_from_json_payload(file, "status")
}
file_info.append(file_summary)
dynamic_record["payloads"] = file_info
Expand Down Expand Up @@ -119,10 +121,11 @@ def add_common_fields(json_payload, dynamic_record):
exploded_frame = bulk_mapped_frame.gs_explode(
colName="payloads", newCol="payload"
)

filtered_frame = exploded_frame.filter(
f=lambda row: (bool(re.match("SUCCESS", row["payload"]["status"]))))

bulk_final_frame = ApplyMapping.apply(
frame=exploded_frame,
frame=filtered_frame,
mappings=[
("timestamp", "bigint", "timestamp", "timestamp"),
("stack", "string", "stack", "string"),
Expand Down

0 comments on commit 7bef7ec

Please sign in to comment.