diff --git a/ejp_xml_pipeline/cli.py b/ejp_xml_pipeline/cli.py index e92b35d7..3fc5427c 100644 --- a/ejp_xml_pipeline/cli.py +++ b/ejp_xml_pipeline/cli.py @@ -66,9 +66,15 @@ def get_s3_client(): return session.create_client('s3') -def list_objects_with_pattern(s3_client, bucket: str, pattern: str) -> list: - """List objects in S3 using prefix and client-side pattern matching""" - # Get the prefix by taking everything before the first wildcard +def list_objects_with_pattern_and_timestamp( + s3_client, + bucket: str, + pattern: str, + latest_timestamp: datetime +) -> list: + """ + List objects in S3 matching pattern and modified after latest_timestamp + """ prefix = pattern.split('*')[0] paginator = s3_client.get_paginator('list_objects_v2') matching_objects = [] @@ -83,11 +89,12 @@ def list_objects_with_pattern(s3_client, bucket: str, pattern: str) -> list: if 'Contents' in page: for obj in page['Contents']: key = obj['Key'] - # Use fnmatch for shell-style pattern matching - if fnmatch.fnmatch(key, pattern): + last_modified = obj['LastModified'] + # Check both pattern match and timestamp + if (fnmatch.fnmatch(key, pattern) and last_modified > latest_timestamp): matching_objects.append({ 'name': key, - 'last_modified': obj['LastModified'] + 'last_modified': last_modified }) except botocore.exceptions.ClientError as err: LOGGER.error('Error listing objects with prefix %s: %s', prefix, err) @@ -105,11 +112,22 @@ def etl_s3_object_pattern( s3_client = get_s3_client() matching_files = {} - # For each pattern, get matching objects - for pattern in obj_pattern_with_latest_dates: - objects = list_objects_with_pattern(s3_client, s3_bucket_name, pattern) + # For each pattern and its timestamp, get matching objects + for pattern, latest_timestamp in obj_pattern_with_latest_dates.items(): + objects = list_objects_with_pattern_and_timestamp( + s3_client, + s3_bucket_name, + pattern, + latest_timestamp + ) if objects: matching_files[pattern] = objects + LOGGER.info( + 'Found %d new files for pattern %s modified after %s', + len(objects), + pattern, + latest_timestamp.isoformat() + ) # Process matching files for object_key_pattern, files_list in matching_files.items():