Skip to content

Commit

Permalink
Mark run ids archived to Glacier as processed (close #23)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed May 5, 2017
1 parent d47346f commit 0c6867a
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 6 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

setup(
name='snowplow_analytics_sdk',
version='0.2.1',
version='0.2.2-a1',
description='Snowplow Analytics Python SDK',
author='Fred Blundun',
url='https://github.com/snowplow/snowplow-python-analytics-sdk',
Expand Down
40 changes: 35 additions & 5 deletions snowplow_analytics_sdk/run_manifests.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def create_manifest_table(dynamodb_client, table_name):
def list_runids(s3_client, full_path):
"""Return list of all run ids inside S3 folder. It does not respect
S3 pagination (`MaxKeys`) and returns **all** keys from bucket
and won't list any prefixes with object archived to AWS Glacier
Arguments:
s3_client - boto3 S3 client (not service)
Expand All @@ -95,14 +96,17 @@ def list_runids(s3_client, full_path):
})

response = s3_client.list_objects_v2(**options)
keys = [extract_run_id(key['Prefix']) for key in response.get('CommonPrefixes', [])]
keys = [extract_run_id(key['Prefix']) for key
in response.get('CommonPrefixes', [])]
run_ids_buffer.extend([key for key in keys if key is not None])
last_continuation_token = response.get('NextContinuationToken', None)

if not response['IsTruncated']:
listing_finished = True

return run_ids_buffer
non_archived_run_ids = [run_id for run_id in run_ids_buffer
if not is_glacier(s3_client, bucket, run_id)]
return non_archived_run_ids


def split_full_path(path):
Expand All @@ -115,19 +119,45 @@ def split_full_path(path):
('mybucket', 'path-to-events/')
>>> split_full_path('s3://mybucket')
('mybucket', None)
>>> split_full_path('s3n://snowplow-bucket/some/prefix/')
('snowplow-bucket', 'some/prefix/')
"""
if path.startswith('s3://'):
path = path.lstrip('s3://')
path = path[5:]
elif path.startswith('s3n://'):
path = path.lstrip('s3n://')
path = path[6:]
elif path.startswith('s3a://'):
path = path[6:]
else:
raise ValueError("S3 path should start with s3:// or s3n:// prefix")
raise ValueError("S3 path should start with s3://, s3n:// or "
"s3a:// prefix")
parts = path.split('/')
bucket = parts[0]
path = '/'.join(parts[1:])
return bucket, normalize_prefix(path)


def is_glacier(s3_client, bucket, prefix):
"""Check if prefix is archived in Glacier, by checking storage class of
first object inside that prefix
Arguments:
s3_client - boto3 S3 client (not service)
bucket - valid extracted bucket (without protocol and prefix)
example: sowplow-events-data
prefix - valid S3 prefix (usually, run_id)
example: snowplow-archive/enriched/archive/
"""
response = s3_client.list_objects_v2(Bucket=bucket,
Prefix=prefix,
MaxKeys=3) # 3 to not fetch _SUCCESS

for key in response['Contents']:
if key.get('StorageClass', 'STANDARD') == 'GLACIER':
return True
return False


def extract_run_id(key):
"""Extract date part from run id
Expand Down

0 comments on commit 0c6867a

Please sign in to comment.