Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#64] Queue and Storage triggers for AWS, GCP and Azure #201

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 50 additions & 16 deletions benchmarks/wrappers/aws/python/handler.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,39 @@

import datetime, io, json, os, sys, uuid

# Add current directory to allow location of packages
sys.path.append(os.path.join(os.path.dirname(__file__), '.python_packages/lib/site-packages'))

# TODO: usual trigger
# implement support for S3 and others
def handler(event, context):

income_timestamp = datetime.datetime.now().timestamp()

# Flag to indicate whether the measurements should be returned as an HTTP
# response or via a result queue.
return_http = True

# Queue trigger
if ("Records" in event and event["Records"][0]["eventSource"] == 'aws:sqs'):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One question here: are we certain we always receive a single event? do we need to add loop here?

event = json.loads(event["Records"][0]["body"])

return_http = False

# Storage trigger
if ("Records" in event and "s3" in event["Records"][0]):
bucket_name = event["Records"][0]["s3"]["bucket"]["name"]
file_name = event["Records"][0]["s3"]["object"]["key"]

from function import storage
storage_inst = storage.storage.get_instance()

obj = storage_inst.get_object(bucket_name, file_name)
event = json.loads(obj['Body'].read())

return_http = False

# HTTP trigger with API Gateaway
if 'body' in event:
event = json.loads(event['body'])

req_id = context.aws_request_id
event['request-id'] = req_id
event['income-timestamp'] = income_timestamp
Expand Down Expand Up @@ -55,17 +76,30 @@ def handler(event, context):
if "cold_start" in os.environ:
cold_start_var = os.environ["cold_start"]

return {
'statusCode': 200,
'body': json.dumps({
'begin': begin.strftime('%s.%f'),
'end': end.strftime('%s.%f'),
'results_time': results_time,
'is_cold': is_cold,
'result': log_data,
'request_id': context.aws_request_id,
'cold_start_var': cold_start_var,
'container_id': container_id,
})
}
stats = json.dumps({
'begin': begin.strftime('%s.%f'),
'end': end.strftime('%s.%f'),
'results_time': results_time,
'is_cold': is_cold,
'result': log_data,
'request_id': context.aws_request_id,
'cold_start_var': cold_start_var,
'container_id': container_id,
})

# HTTP or library trigger: return an HTTP response.
if (return_http):
return {
'statusCode': 200,
'body': stats
}

# Queue or storage trigger: return via a result queue.
arn = context.invoked_function_arn.split(":")
region = arn[3]
account_id = arn[4]
queue_name = f"{arn[6]}-result"

from function import queue
queue_client = queue.queue(queue_name, account_id, region)
queue_client.send_message(stats)
14 changes: 14 additions & 0 deletions benchmarks/wrappers/aws/python/queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import boto3

class queue:
client = None

def __init__(self, queue_name: str, account_id: str, region: str):
self.client = boto3.client('sqs', region_name=region)
self.queue_url = f"https://sqs.{region}.amazonaws.com/{account_id}/{queue_name}"

def send_message(self, message: str):
self.client.send_message(
QueueUrl=self.queue_url,
MessageBody=message,
)
Comment on lines +3 to +14
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename the class and add error handling.

Consider renaming the class from queue to avoid conflicts with Python's built-in queue module. Additionally, add error handling for AWS client interactions to improve robustness.

Suggested class name change and error handling:

-import boto3
+import boto3
+from botocore.exceptions import ClientError

-class queue:
+class AWSQueue:
     client = None

     def __init__(self, queue_name: str, account_id: str, region: str):
         self.client = boto3.client('sqs', region_name=region)
         self.queue_url = f"https://sqs.{region}.amazonaws.com/{account_id}/{queue_name}"

     def send_message(self, message: str):
         try:
             self.client.send_message(
                 QueueUrl=self.queue_url,
                 MessageBody=message,
             )
         except ClientError as e:
             print(f"An error occurred: {e}")
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
class queue:
client = None
def __init__(self, queue_name: str, account_id: str, region: str):
self.client = boto3.client('sqs', region_name=region)
self.queue_url = f"https://sqs.{region}.amazonaws.com/{account_id}/{queue_name}"
def send_message(self, message: str):
self.client.send_message(
QueueUrl=self.queue_url,
MessageBody=message,
)
import boto3
from botocore.exceptions import ClientError
class AWSQueue:
client = None
def __init__(self, queue_name: str, account_id: str, region: str):
self.client = boto3.client('sqs', region_name=region)
self.queue_url = f"https://sqs.{region}.amazonaws.com/{account_id}/{queue_name}"
def send_message(self, message: str):
try:
self.client.send_message(
QueueUrl=self.queue_url,
MessageBody=message,
)
except ClientError as e:
print(f"An error occurred: {e}")

3 changes: 3 additions & 0 deletions benchmarks/wrappers/aws/python/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ def download_stream(self, bucket, file):
data = io.BytesIO()
self.client.download_fileobj(bucket, file, data)
return data.getbuffer()

def get_object(self, bucket, file):
return self.client.get_object(Bucket=bucket, Key=file)

def get_instance():
if storage.instance is None:
Expand Down
71 changes: 59 additions & 12 deletions benchmarks/wrappers/azure/python/handler.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,70 @@

import datetime, io, json, os, uuid
import base64
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove unused import.

The base64 import is unused and should be removed.

- import base64
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
import base64
Tools
Ruff

2-2: base64 imported but unused

Remove unused import: base64

(F401)

import datetime, io, json, logging, os, uuid

from azure.identity import ManagedIdentityCredential
from azure.storage.queue import QueueClient

import azure.functions as func


# TODO: usual trigger
# implement support for blob and others
def main(req: func.HttpRequest, context: func.Context) -> func.HttpResponse:
def handler_http(req: func.HttpRequest, context: func.Context) -> func.HttpResponse:
income_timestamp = datetime.datetime.now().timestamp()

req_json = req.get_json()
if 'connection_string' in req_json:
os.environ['STORAGE_CONNECTION_STRING'] = req_json['connection_string']

req_json['request-id'] = context.invocation_id
req_json['income-timestamp'] = income_timestamp

return func.HttpResponse(measure(req_json), mimetype="application/json")

def handler_queue(msg: func.QueueMessage, context: func.Context):
income_timestamp = datetime.datetime.now().timestamp()

logging.info('Python queue trigger function processed a queue item.')
payload = msg.get_json()

payload['request-id'] = context.invocation_id
payload['income-timestamp'] = income_timestamp

stats = measure(payload)

queue_name = f"{os.getenv('WEBSITE_SITE_NAME')}-result"
storage_account = os.getenv('STORAGE_ACCOUNT')
logging.info(queue_name)
logging.info(storage_account)

from . import queue
queue_client = queue.queue(queue_name, storage_account)
queue_client.send_message(stats)

def handler_storage(blob: func.InputStream, context: func.Context):
income_timestamp = datetime.datetime.now().timestamp()

logging.info('Python Blob trigger function processed %s', blob.name)
payload = json.loads(blob.readline().decode('utf-8'))

payload['request-id'] = context.invocation_id
payload['income-timestamp'] = income_timestamp

stats = measure(payload)

queue_name = f"{os.getenv('WEBSITE_SITE_NAME')}-result"
storage_account = os.getenv('STORAGE_ACCOUNT')
logging.info(queue_name)
logging.info(storage_account)

from . import queue
queue_client = queue.queue(queue_name, storage_account)
queue_client.send_message(stats)

# Contains generic logic for gathering measurements for the function at hand,
# given a request JSON. Used by all handlers, regardless of the trigger.
def measure(req_json) -> str:
req_id = req_json['request-id']

begin = datetime.datetime.now()
# We are deployed in the same directory
from . import function
Expand All @@ -30,7 +82,6 @@ def main(req: func.HttpRequest, context: func.Context) -> func.HttpResponse:
from . import storage
storage_inst = storage.storage.get_instance()
b = req_json.get('logs').get('bucket')
req_id = context.invocation_id
storage_inst.upload_stream(b, '{}.json'.format(req_id),
io.BytesIO(json.dumps(log_data).encode('utf-8')))
results_end = datetime.datetime.now()
Expand Down Expand Up @@ -58,8 +109,7 @@ def main(req: func.HttpRequest, context: func.Context) -> func.HttpResponse:
cold_marker = True
is_cold_worker = True

return func.HttpResponse(
json.dumps({
return json.dumps({
'begin': begin.strftime('%s.%f'),
'end': end.strftime('%s.%f'),
'results_time': results_time,
Expand All @@ -68,8 +118,5 @@ def main(req: func.HttpRequest, context: func.Context) -> func.HttpResponse:
'is_cold_worker': is_cold_worker,
'container_id': container_id,
'environ_container_id': os.environ['CONTAINER_NAME'],
'request_id': context.invocation_id
}),
mimetype="application/json"
)

'request_id': req_id
})
15 changes: 15 additions & 0 deletions benchmarks/wrappers/azure/python/queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from azure.identity import ManagedIdentityCredential
from azure.storage.queue import QueueClient

class queue:
client = None

def __init__(self, queue_name: str, storage_account: str):
account_url = f"https://{storage_account}.queue.core.windows.net"
managed_credential = ManagedIdentityCredential()
self.client = QueueClient(account_url,
queue_name=queue_name,
credential=managed_credential)

def send_message(self, message: str):
self.client.send_message(message)
Comment on lines +14 to +15
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add error handling in send_message.

The send_message method currently does not handle any potential exceptions that might occur during the message sending process. It's recommended to add error handling to improve the robustness of this method.

Consider wrapping the call in a try-except block:

     def send_message(self, message: str):
+        try:
             self.client.send_message(message)
+        except Exception as e:
+            logging.error(f"Failed to send message: {e}")
+            raise
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def send_message(self, message: str):
self.client.send_message(message)
def send_message(self, message: str):
try:
self.client.send_message(message)
except Exception as e:
logging.error(f"Failed to send message: {e}")
raise

Comment on lines +4 to +15
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor class name and improve thread safety.

The class name queue should be in CamelCase to adhere to Python's PEP 8 style guide. Also, using a class-level variable for client is not thread-safe. Consider initializing the client in the __init__ method to ensure each instance has its own client.

Here's a suggested refactor:

-class queue:
+class Queue:
-    client = None
+
     def __init__(self, queue_name: str, storage_account: str):
+        self.client = None
         account_url = f"https://{storage_account}.queue.core.windows.net"
         managed_credential = ManagedIdentityCredential()
-        self.client = QueueClient(account_url,
+        self.client = QueueClient(account_url,
                             queue_name=queue_name,
                             credential=managed_credential)
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
class queue:
client = None
def __init__(self, queue_name: str, storage_account: str):
account_url = f"https://{storage_account}.queue.core.windows.net"
managed_credential = ManagedIdentityCredential()
self.client = QueueClient(account_url,
queue_name=queue_name,
credential=managed_credential)
def send_message(self, message: str):
self.client.send_message(message)
class Queue:
def __init__(self, queue_name: str, storage_account: str):
self.client = None
account_url = f"https://{storage_account}.queue.core.windows.net"
managed_credential = ManagedIdentityCredential()
self.client = QueueClient(account_url,
queue_name=queue_name,
credential=managed_credential)
def send_message(self, message: str):
self.client.send_message(message)

69 changes: 64 additions & 5 deletions benchmarks/wrappers/gcp/python/handler.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,75 @@
import datetime, io, json, os, uuid, sys
import base64, datetime, io, json, os, uuid, sys

sys.path.append(os.path.join(os.path.dirname(__file__), '.python_packages/lib/site-packages'))
from google.cloud import storage as gcp_storage
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove unused import.

The google.cloud.storage import is unused and should be removed.

- from google.cloud import storage as gcp_storage
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
from google.cloud import storage as gcp_storage
Tools
Ruff

3-3: google.cloud.storage imported but unused

Remove unused import: google.cloud.storage

(F401)


sys.path.append(os.path.join(os.path.dirname(__file__), '.python_packages/lib/site-packages'))

def handler(req):
def handler_http(req):
income_timestamp = datetime.datetime.now().timestamp()
req_id = req.headers.get('Function-Execution-Id')


req_json = req.get_json()
req_json['request-id'] = req_id
req_json['income-timestamp'] = income_timestamp

return measure(req_json), 200, {'ContentType': 'application/json'}

def handler_queue(data, context):
income_timestamp = datetime.datetime.now().timestamp()

serialized_payload = data.get('data')
payload = json.loads(base64.b64decode(serialized_payload).decode("utf-8"))

payload['request-id'] = context.event_id
payload['income-timestamp'] = income_timestamp

stats = measure(payload)

# Retrieve the project id and construct the result queue name.
project_id = context.resource.split("/")[1]
topic_name = f"{context.resource.split('/')[3]}-result"

from function import queue
queue_client = queue.queue(topic_name, project_id)
queue_client.send_message(stats)

def handler_storage(data, context):
income_timestamp = datetime.datetime.now().timestamp()

bucket_name = data.get('bucket')
name = data.get('name')
filepath = '/tmp/bucket_contents'

from function import storage
storage_inst = storage.storage.get_instance()
storage_inst.download(bucket_name, name, filepath)

payload = {}

with open(filepath, 'r') as fp:
payload = json.load(fp)

payload['request-id'] = context.event_id
payload['income-timestamp'] = income_timestamp

stats = measure(payload)

# Retrieve the project id and construct the result queue name.
from google.auth import default
# Used to be an env var, now we need an additional request to the metadata
# server to retrieve it.
_, project_id = default()
topic_name = f"{context.resource['name'].split('/')[3]}-result"

from function import queue
queue_client = queue.queue(topic_name, project_id)
queue_client.send_message(stats)

Comment on lines +36 to +67
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function handler_storage is well-implemented but consider improving error handling.

The function correctly handles storage events, including retrieving and processing data from a Google Cloud Storage bucket. Consider adding error handling for potential issues during data retrieval and processing to enhance robustness.

Consider adding error handling around the storage interactions:

try:
    storage_inst.download(bucket_name, name, filepath)
except Exception as e:
    # Handle exceptions appropriately
    logging.error(f"Error downloading from bucket: {e}")
    return {'error': str(e)}, 500

# Contains generic logic for gathering measurements for the function at hand,
# given a request JSON. Used by all handlers, regardless of the trigger.
def measure(req_json) -> str:
req_id = req_json['request-id']

begin = datetime.datetime.now()
# We are deployed in the same directorygit status
from function import function
Expand Down Expand Up @@ -61,4 +120,4 @@ def handler(req):
'request_id': req_id,
'cold_start_var': cold_start_var,
'container_id': container_id,
}), 200, {'ContentType': 'application/json'}
})
14 changes: 14 additions & 0 deletions benchmarks/wrappers/gcp/python/queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from google.cloud import pubsub_v1

class queue:
client = None

def __init__(self, topic_name: str, project_id: str):
self.client = pubsub_v1.PublisherClient()
self.topic_name = 'projects/{project_id}/topics/{topic}'.format(
project_id=project_id,
topic=topic_name,
)

def send_message(self, message: str):
self.client.publish(self.topic_name, message.encode("utf-8"))
Comment on lines +3 to +14
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename the class and add error handling.

Consider renaming the class from queue to avoid conflicts with Python's built-in queue module. Additionally, add error handling for GCP client interactions to improve robustness.

Suggested class name change and error handling:

-from google.cloud import pubsub_v1
+from google.cloud import pubsub_v1
+from google.api_core.exceptions import GoogleAPICallError, RetryError

-class queue:
+class GCPQueue:
     client = None

     def __init__(self, topic_name: str, project_id: str):
         self.client = pubsub_v1.PublisherClient()
         self.topic_name = 'projects/{project_id}/topics/{topic}'.format(
             project_id=project_id,
             topic=topic_name,
         )

     def send_message(self, message: str):
         try:
             self.client.publish(self.topic_name, message.encode("utf-8"))
         except (GoogleAPICallError, RetryError) as e:
             print(f"An error occurred: {e}")
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
class queue:
client = None
def __init__(self, topic_name: str, project_id: str):
self.client = pubsub_v1.PublisherClient()
self.topic_name = 'projects/{project_id}/topics/{topic}'.format(
project_id=project_id,
topic=topic_name,
)
def send_message(self, message: str):
self.client.publish(self.topic_name, message.encode("utf-8"))
from google.cloud import pubsub_v1
from google.api_core.exceptions import GoogleAPICallError, RetryError
class GCPQueue:
client = None
def __init__(self, topic_name: str, project_id: str):
self.client = pubsub_v1.PublisherClient()
self.topic_name = 'projects/{project_id}/topics/{topic}'.format(
project_id=project_id,
topic=topic_name,
)
def send_message(self, message: str):
try:
self.client.publish(self.topic_name, message.encode("utf-8"))
except (GoogleAPICallError, RetryError) as e:
print(f"An error occurred: {e}")

2 changes: 1 addition & 1 deletion config/example.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"download_results": false,
"runtime": {
"language": "python",
"version": "3.7"
"version": "3.9"
},
"type": "invocation-overhead",
"perf-cost": {
Expand Down
13 changes: 9 additions & 4 deletions config/systems.json
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@
"deployment": {
"files": [
"handler.py",
"storage.py"
"storage.py",
"queue.py"
],
"packages": []
}
Expand Down Expand Up @@ -112,10 +113,13 @@
"deployment": {
"files": [
"handler.py",
"storage.py"
"storage.py",
"queue.py"
],
"packages": [
"azure-storage-blob"
"azure-storage-blob",
"\nazure-storage-queue",
"\nazure-identity"
]
}
},
Expand Down Expand Up @@ -165,7 +169,8 @@
"deployment": {
"files": [
"handler.py",
"storage.py"
"storage.py",
"queue.py"
],
"packages": [
"google-cloud-storage"
Expand Down
4 changes: 3 additions & 1 deletion docs/modularity.md
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ Check other platforms to see how configuration is defined, for example, for AWS:
"deployment": {
"files": [
"handler.py",
"storage.py"
"storage.py",
"queue.py"
Comment on lines +270 to +271
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review the addition of queue.py in the deployment configuration.

The addition of queue.py alongside storage.py in the deployment configuration is a significant change. It suggests an expansion in the capabilities of the deployment, possibly to handle queue-related operations. This change should be clearly documented and justified in the context of the benchmarks that would utilize this new configuration.

Ensure that the documentation explicitly mentions how queue.py is used in the deployment process and which benchmarks or scenarios require it. This will help maintain clarity and usefulness of the documentation.

],
"packages": []
}
Expand Down Expand Up @@ -303,6 +304,7 @@ Implement this step in the following function:
language_version: str,
benchmark: str,
is_cached: bool,
trigger: Optional[Trigger.TriggerType],
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review the addition of the trigger parameter in the function signature.

The addition of the trigger parameter to the function signature is a critical update. It allows the function to handle an optional trigger type, which can significantly alter its behavior based on the presence or absence of this parameter.

Verify that all references to this function in the documentation and codebase have been updated to reflect this new parameter. Additionally, provide examples or scenarios where this parameter would be used, enhancing the understanding and applicability of this change.

) -> Tuple[str, int]
```

Expand Down
Loading