Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#64] Queue and Storage triggers for AWS, GCP and Azure #201

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions benchmarks/wrappers/aws/python/handler.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,31 @@

import datetime, io, json, os, sys, uuid

# Add current directory to allow location of packages
sys.path.append(os.path.join(os.path.dirname(__file__), '.python_packages/lib/site-packages'))

# TODO: usual trigger
# implement support for S3 and others
def handler(event, context):

income_timestamp = datetime.datetime.now().timestamp()

# Queue trigger
if ("Records" in event and event["Records"][0]["eventSource"] == 'aws:sqs'):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One question here: are we certain we always receive a single event? do we need to add loop here?

event = json.loads(event["Records"][0]["body"])

# Storage trigger
if ("Records" in event and "s3" in event["Records"][0]):
bucket_name = event["Records"][0]["s3"]["bucket"]["name"]
file_name = event["Records"][0]["s3"]["object"]["key"]

from function import storage
storage_inst = storage.storage.get_instance()

obj = storage_inst.get_object(bucket_name, file_name)
event = json.loads(obj['Body'].read())

# HTTP trigger with API Gateaway
if 'body' in event:
event = json.loads(event['body'])

req_id = context.aws_request_id
event['request-id'] = req_id
event['income-timestamp'] = income_timestamp
Expand Down
3 changes: 3 additions & 0 deletions benchmarks/wrappers/aws/python/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ def download_stream(self, bucket, file):
data = io.BytesIO()
self.client.download_fileobj(bucket, file, data)
return data.getbuffer()

def get_object(self, bucket, file):
return self.client.get_object(Bucket=bucket, Key=file)

def get_instance():
if storage.instance is None:
Expand Down
24 changes: 20 additions & 4 deletions benchmarks/wrappers/azure/python/handler.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@

import datetime, io, json, os, uuid
import base64
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove unused import.

The base64 import is unused and should be removed.

- import base64
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
import base64
Tools
Ruff

2-2: base64 imported but unused

Remove unused import: base64

(F401)

import datetime, io, json, logging, os, uuid

import azure.functions as func


# TODO: usual trigger
# implement support for blob and others
def main(req: func.HttpRequest, context: func.Context) -> func.HttpResponse:
def handler_http(req: func.HttpRequest, context: func.Context) -> func.HttpResponse:
income_timestamp = datetime.datetime.now().timestamp()
req_json = req.get_json()
if 'connection_string' in req_json:
Expand Down Expand Up @@ -73,3 +72,20 @@ def main(req: func.HttpRequest, context: func.Context) -> func.HttpResponse:
mimetype="application/json"
)

def handler_queue(msg: func.QueueMessage):
logging.info('Python queue trigger function processed a queue item.')
payload = msg.get_body().decode('utf-8')

from . import function
ret = function.handler(payload)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove unused variable ret.

The ret variable is assigned but never used.

- ret = function.handler(payload)
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ret = function.handler(payload)
function.handler(payload)
Tools
Ruff

80-80: Local variable ret is assigned to but never used

Remove assignment to unused variable ret

(F841)


# TODO(oana)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we missing something here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this is missing the core part of making measurements and returning values. Maybe we can just do a bit of restructuring to have a single invocation + measurement code, called from three trigger interfaces?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just pushed the measurements infrastructure - all invocations are end-to-end complete.


def handler_storage(blob: func.InputStream):
logging.info('Python Blob trigger function processed %s', blob.name)
payload = blob.readline().decode('utf-8') # TODO(oana)

from . import function
ret = function.handler(payload)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove unused variable ret.

The ret variable is assigned but never used.

- ret = function.handler(payload)
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ret = function.handler(payload)
function.handler(payload)
Tools
Ruff

89-89: Local variable ret is assigned to but never used

Remove assignment to unused variable ret

(F841)


# TODO(oana)
oanarosca marked this conversation as resolved.
Show resolved Hide resolved
35 changes: 32 additions & 3 deletions benchmarks/wrappers/gcp/python/handler.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import datetime, io, json, os, uuid, sys
import base64, datetime, io, json, os, uuid, sys

sys.path.append(os.path.join(os.path.dirname(__file__), '.python_packages/lib/site-packages'))
from google.cloud import storage as gcp_storage
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove unused import.

The google.cloud.storage import is unused and should be removed.

- from google.cloud import storage as gcp_storage
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
from google.cloud import storage as gcp_storage
Tools
Ruff

3-3: google.cloud.storage imported but unused

Remove unused import: google.cloud.storage

(F401)


sys.path.append(os.path.join(os.path.dirname(__file__), '.python_packages/lib/site-packages'))

def handler(req):
def handler_http(req):
income_timestamp = datetime.datetime.now().timestamp()
req_id = req.headers.get('Function-Execution-Id')

Expand Down Expand Up @@ -62,3 +63,31 @@ def handler(req):
'cold_start_var': cold_start_var,
'container_id': container_id,
}), 200, {'ContentType': 'application/json'}

def handler_queue(data, context):
serialized_payload = data.get('data')
payload = json.loads(base64.b64decode(serialized_payload).decode("utf-8"))

from function import function
ret = function.handler(payload)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove unused variable ret.

The ret variable is assigned but never used.

- ret = function.handler(payload)
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ret = function.handler(payload)
Tools
Ruff

72-72: Local variable ret is assigned to but never used

Remove assignment to unused variable ret

(F841)


# TODO(oana)
oanarosca marked this conversation as resolved.
Show resolved Hide resolved

def handler_storage(data, context):
bucket_name = data.get('bucket')
name = data.get('name')
filepath = '/tmp/bucket_contents'

from function import storage
storage_inst = storage.storage.get_instance()
storage_inst.download(bucket_name, name, filepath)

payload = {}

with open(filepath, 'r') as fp:
payload = json.load(fp)

from function import function
ret = function.handler(payload)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove unused variable ret.

The ret variable is assigned but never used.

- ret = function.handler(payload)
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ret = function.handler(payload)
function.handler(payload)
Tools
Ruff

91-91: Local variable ret is assigned to but never used

Remove assignment to unused variable ret

(F841)


# TODO(oana)
2 changes: 1 addition & 1 deletion config/example.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"download_results": false,
"runtime": {
"language": "python",
"version": "3.7"
"version": "3.9"
},
"type": "invocation-overhead",
"perf-cost": {
Expand Down
3 changes: 2 additions & 1 deletion config/systems.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
"python": {
"base_images": {
"3.7": "python:3.7-slim",
"3.8": "python:3.8-slim"
"3.8": "python:3.8-slim",
"3.9": "python:3.9-slim"
oanarosca marked this conversation as resolved.
Show resolved Hide resolved
},
"images": [
"run",
Expand Down
1 change: 1 addition & 0 deletions docs/modularity.md
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ Implement this step in the following function:
language_version: str,
benchmark: str,
is_cached: bool,
trigger: Optional[Trigger.TriggerType],
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review the addition of the trigger parameter in the function signature.

The addition of the trigger parameter to the function signature is a critical update. It allows the function to handle an optional trigger type, which can significantly alter its behavior based on the presence or absence of this parameter.

Verify that all references to this function in the documentation and codebase have been updated to reflect this new parameter. Additionally, provide examples or scenarios where this parameter would be used, enhancing the understanding and applicability of this change.

) -> Tuple[str, int]
```

Expand Down
6 changes: 3 additions & 3 deletions docs/platforms.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ AZURE_SECRET_PASSWORD = XXXXXXXXXXXXX
You can pass the credentials either using the environment variables:

```
export AZURE_SECRET_APPLICATION_ID = XXXXXXXXXXXXXXXX
export AZURE_SECRET_TENANT = XXXXXXXXXXXX
export AZURE_SECRET_PASSWORD = XXXXXXXXXXXXX
export AZURE_SECRET_APPLICATION_ID=XXXXXXXXXXXXXXXX
oanarosca marked this conversation as resolved.
Show resolved Hide resolved
export AZURE_SECRET_TENANT=XXXXXXXXXXXX
export AZURE_SECRET_PASSWORD=XXXXXXXXXXXXX
```

or in the JSON input configuration:
Expand Down
2 changes: 2 additions & 0 deletions requirements.azure.txt
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
azure-storage-blob==12.10.0
azure-storage-queue==12.9.0
azure-identity==1.16.0
17 changes: 15 additions & 2 deletions sebs.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def benchmark():
@click.option("--repetitions", default=5, type=int, help="Number of experimental repetitions.")
@click.option(
"--trigger",
type=click.Choice(["library", "http"]),
type=click.Choice(["library", "http", "queue", "storage"]),
default="http",
help="Function trigger to be used.",
)
Expand Down Expand Up @@ -224,6 +224,10 @@ def invoke(
if image_tag_prefix is not None:
sebs_client.config.image_tag_prefix = image_tag_prefix

# Insert trigger into (experiment) config. Required by Azure when packaging.
trigger = trigger if trigger is not None else "http"
update_nested_dict(config, ["experiments", "trigger"], trigger)

experiment_config = sebs_client.get_experiment_config(config["experiments"])
update_nested_dict(config, ["experiments", "benchmark"], benchmark)
benchmark_obj = sebs_client.get_benchmark(
Expand All @@ -237,9 +241,18 @@ def invoke(
if timeout is not None:
benchmark_obj.benchmark_config.timeout = timeout

function_name = function_name if function_name else deployment_client.default_function_name(benchmark_obj)

# GCP and Azure only allow one trigger per function, so augment function name with
# trigger type: _http, _queue etc.
#
# Additionally, Azure requires for the trigger to be defined at deployment time.
if deployment_client.name() == "gcp" or deployment_client.name() == "azure":
oanarosca marked this conversation as resolved.
Show resolved Hide resolved
function_name = "{}-{}".format(function_name, trigger)

func = deployment_client.get_function(
benchmark_obj,
function_name if function_name else deployment_client.default_function_name(benchmark_obj),
function_name,
)
storage = deployment_client.get_storage(replace_existing=experiment_config.update_storage)
input_config = benchmark_obj.prepare_input(storage=storage, size=benchmark_input_size)
Expand Down
20 changes: 18 additions & 2 deletions sebs/aws/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ def package_code(
language_version: str,
benchmark: str,
is_cached: bool,
trigger: Optional[Trigger.TriggerType],
) -> Tuple[str, int]:

CONFIG_FILES = {
Expand Down Expand Up @@ -258,13 +259,19 @@ def create_function(self, code_package: Benchmark, func_name: str) -> "LambdaFun

def cached_function(self, function: Function):

from sebs.aws.triggers import LibraryTrigger
from sebs.aws.triggers import LibraryTrigger, QueueTrigger, StorageTrigger

for trigger in function.triggers(Trigger.TriggerType.LIBRARY):
trigger.logging_handlers = self.logging_handlers
cast(LibraryTrigger, trigger).deployment_client = self
for trigger in function.triggers(Trigger.TriggerType.HTTP):
trigger.logging_handlers = self.logging_handlers
for trigger in function.triggers(Trigger.TriggerType.QUEUE):
trigger.logging_handlers = self.logging_handlers
cast(QueueTrigger, trigger).deployment_client = self
for trigger in function.triggers(Trigger.TriggerType.STORAGE):
trigger.logging_handlers = self.logging_handlers
cast(StorageTrigger, trigger).deployment_client = self
Comment on lines +263 to +275
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review the handling of new trigger types in the cached_function method.

The modifications to the cached_function method to handle QueueTrigger and StorageTrigger are crucial for supporting these new types of triggers. The method now iterates over triggers of these types, setting their logging handlers and deployment clients accordingly.

Verify that the integration of these new triggers does not interfere with the existing functionalities of other trigger types. Additionally, ensure that the logging and deployment client settings are correctly applied to these triggers for consistent behavior across all trigger types.


"""
Update function code and configuration on AWS.
Expand Down Expand Up @@ -484,10 +491,11 @@ def download_metrics(
)

def create_trigger(self, func: Function, trigger_type: Trigger.TriggerType) -> Trigger:
from sebs.aws.triggers import HTTPTrigger
from sebs.aws.triggers import HTTPTrigger, QueueTrigger, StorageTrigger

function = cast(LambdaFunction, func)

trigger: Trigger
if trigger_type == Trigger.TriggerType.HTTP:

api_name = "{}-http-api".format(function.name)
Expand All @@ -511,6 +519,14 @@ def create_trigger(self, func: Function, trigger_type: Trigger.TriggerType) -> T
elif trigger_type == Trigger.TriggerType.LIBRARY:
# should already exist
return func.triggers(Trigger.TriggerType.LIBRARY)[0]
elif trigger_type == Trigger.TriggerType.QUEUE:
trigger = QueueTrigger(func.name, self)
trigger.logging_handlers = self.logging_handlers
self.logging.info(f"Created Queue trigger for {func.name} function.")
elif trigger_type == Trigger.TriggerType.STORAGE:
trigger = StorageTrigger(func.name, self)
trigger.logging_handlers = self.logging_handlers
self.logging.info(f"Created Storage trigger for {func.name} function.")
else:
raise RuntimeError("Not supported!")

Expand Down
9 changes: 7 additions & 2 deletions sebs/aws/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def serialize(self) -> dict:
@staticmethod
def deserialize(cached_config: dict) -> "LambdaFunction":
from sebs.faas.function import Trigger
from sebs.aws.triggers import LibraryTrigger, HTTPTrigger
from sebs.aws.triggers import LibraryTrigger, HTTPTrigger, QueueTrigger, StorageTrigger

cfg = FunctionConfig.deserialize(cached_config["config"])
ret = LambdaFunction(
Expand All @@ -55,7 +55,12 @@ def deserialize(cached_config: dict) -> "LambdaFunction":
for trigger in cached_config["triggers"]:
trigger_type = cast(
Trigger,
{"Library": LibraryTrigger, "HTTP": HTTPTrigger}.get(trigger["type"]),
{
"Library": LibraryTrigger,
"HTTP": HTTPTrigger,
"Queue": QueueTrigger,
"Storage": StorageTrigger,
}.get(trigger["type"]),
)
assert trigger_type, "Unknown trigger type {}".format(trigger["type"])
ret.add_trigger(trigger_type.deserialize(trigger))
Expand Down
Loading