Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interface for Queues #220

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements.gcp.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ google-api-python-client==1.12.5
google-cloud-monitoring==2.0.0
google-api-python-client-stubs
google-cloud-logging==2.0.0
google-cloud-pubsub=2.23.0
89 changes: 89 additions & 0 deletions sebs/aws/queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from sebs.cache import Cache
from sebs.faas.config import Resources
from sebs.faas.queue import Queue

import boto3


class SQS(Queue):
@staticmethod
def typename() -> str:
return "AWS.SQS"

@staticmethod
def deployment_name():
return "aws"

@property
def queue_url(self):
return self._queue_url

def __init__(
self,
benchmark: str,
queue_type: Queue.QueueType,
session: boto3.session.Session,
cache_client: Cache,
resources: Resources,
region: str
):
super().__init__(benchmark, queue_type, region, cache_client, resources)
self.client = session.client(
"sqs",
region_name=region,
)

def create_queue(self) -> str:
self.logging.debug(f"Creating queue {self.name}")

self._queue_url = self.client.create_queue(QueueName=self.name)["QueueUrl"]
queue_arn = self.client.get_queue_attributes(
QueueUrl=self.queue_url,
AttributeNames=["QueueArn"],
)["Attributes"]["QueueArn"]

self.logging.debug("Created queue")

if (self.queue_type == Queue.QueueType.TRIGGER):
# Make it an actual trigger for the function. GCP and Azure use
# different mechanisms so this is skipped for them.
if (not len(self.client.list_event_source_mappings(EventSourceArn=queue_arn,
FunctionName=self.name)
["EventSourceMappings"])):
self.client.create_event_source_mapping(
EventSourceArn=queue_arn,
FunctionName=self.name,
MaximumBatchingWindowInSeconds=1,
)

def remove_queue(self):
self.logging.info(f"Deleting queue {self.name}")

self.client.delete_queue(QueueUrl=self.queue_url)

self.logging.info("Deleted queue")

def send_message(self, serialized_message: str):
self.client.send_message(
QueueUrl=self.queue_url,
MessageBody=serialized_message,
)
self.logging.info(f"Sent message to queue {self.name}")

def receive_message(self) -> str:
self.logging.info(f"Pulling a message from {self.name}")

response = self.client.receive_message(
QueueUrl=self.queue_url,
MessageSystemAttributeNames=["SentTimestamp"],
MaxNumberOfMessages=1,
MessageAttributeNames=["All"],
WaitTimeSeconds=5,
)

if ("Messages" not in response):
self.logging.info("No messages to be received")
return

self.logging.info(f"Received a message from {self.name}")
return response["Messages"][0]["Body"]
78 changes: 78 additions & 0 deletions sebs/azure/queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from sebs.cache import Cache
from sebs.faas.config import Resources
from sebs.faas.queue import Queue, QueueType

from azure.core.exceptions import ResourceExistsError
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient
from azure.storage.queue import QueueClient


class AzureQueue(Queue):
@staticmethod
def typename() -> str:
return "Azure.Queue"

@staticmethod
def deployment_name():
return "azure"

@property
def storage_account(self) -> str:
assert self._storage_account
return self._storage_account

@property
def account_url(self) -> str:
return f"https://{self.storage_account}.queue.core.windows.net"

def __init__(
self,
benchmark: str,
queue_type: QueueType,
cache_client: Cache,
resources: Resources,
region: str,
storage_account: str,
):
default_credential = DefaultAzureCredential()

super().__init__(benchmark, queue_type, region, cache_client, resources)
self._storage_account = storage_account
self.client = QueueClient(self.account_url,
queue_name=self.name,
credential=default_credential)

def create_queue(self):
self.logging.info(f"Creating queue {self.name}")

try:
self.client.create_queue()
self.logging.info("Created queue")
except ResourceExistsError:
self.logging.info("Queue already exists, reusing...")

def remove_queue(self):
self.logging.info(f"Deleting queue {self.name}")

self.client.delete_queue()

self.logging.info("Deleted queue")

def send_message(self, serialized_message: str):
self.client.send_message(serialized_message)
self.logging.info(f"Sent message to queue {self.queue_name}")

def receive_message(self) -> str:
self.logging.info(f"Pulling a message from {self.name}")

response = self.client.receive_messages(
max_messages=1,
timeout=5,
)

if (len(response) == 0):
self.logging.info("No messages to be received")
return

return response[0].content
60 changes: 60 additions & 0 deletions sebs/faas/queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from abc import ABC
from abc import abstractmethod
from enum import Enum

from sebs.faas.config import Resources
from sebs.cache import Cache
from sebs.utils import LoggingBase

class QueueType(str, Enum):
TRIGGER = "trigger"
RESULT = "result"


class Queue(ABC, LoggingBase):

@staticmethod
@abstractmethod
def deployment_name() -> str:
pass

@property
def cache_client(self) -> Cache:
return self._cache_client

@property
def region(self):
return self._region

@property
def queue_type(self):
return self._queue_type

@property
def name(self):
return self._name

def __init__(self, benchmark: str, queue_type: QueueType, region: str, cache_client: Cache, resources: Resources):
super().__init__()
self._name = "{}-{}".format(benchmark, queue_type)
self._queue_type = queue_type
self._cache_client = cache_client
self._cached = False
self._region = region
self._cloud_resources = resources

@abstractmethod
def create_queue(self):
pass

@abstractmethod
def remove_queue(self):
pass

@abstractmethod
def send_message(self, serialized_message: str):
pass

@abstractmethod
def receive_message(self) -> str:
pass
111 changes: 111 additions & 0 deletions sebs/gcp/queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from googleapiclient.discovery import build

from sebs.cache import Cache
from sebs.faas.config import Resources
from sebs.faas.queue import Queue, QueueType

from google.api_core import retry
from google.api_core.exceptions import AlreadyExists
from google.cloud import pubsub_v1

import os


class GCPQueue(Queue):
@staticmethod
def typename() -> str:
return "GCP.Queue"

@staticmethod
def deployment_name():
return "gcp"

@property
def topic_name(self):
return self._topic_name

@property
def subscription_name(self):
return self._subscription_name

@property
def subscription_client(self):
return self._subscription_client

def __init__(
self,
benchmark: str,
queue_type: QueueType,
cache_client: Cache,
resources: Resources,
region: str
):
super().__init__(benchmark, queue_type, region, cache_client, resources)
self.client = pubsub_v1.PublisherClient()
self._subscription_client = pubsub_v1.SubscriberClient()

self._topic_name = 'projects/{project_id}/topics/{topic}'.format(
project_id=os.getenv('GOOGLE_CLOUD_PROJECT'),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This variable should be stored already in config.resources -> no need to retrieve it again.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't see another mention of GOOGLE_CLOUD_PROJECT in the code - I believe it may have only been added in config.resources by your NoSQL PR?

topic=self.name,
)
self._subscription_name = 'projects/{project_id}/subscriptions/{sub}'.format(
project_id=os.getenv('GOOGLE_CLOUD_PROJECT'),
sub=self.name,
)

def create_queue(self):
self.logging.info(f"Creating queue {self.name}")
try:
self.client.create_topic(name=self.topic_name)
self.logging.info("Created queue")
except AlreadyExists:
self.logging.info("Queue already exists, reusing...")

# GCP additionally needs a 'subscription' resource which is the
# actual receiver of the messages. It is constructed and destructed
# alongside the topic at all times.
self.logging.info(f"Creating queue subscription")
try:
self.subscription_client.create_subscription(
name=self.subscription_name,
topic=self.topic_name
)
self.logging.info("Created queue subscription")
except AlreadyExists:
self.logging.info("Subscription already exists, reusing...")

def remove_queue(self):
self.logging.info(f"Deleting queue and associated subscription{self.name}")

self.client.delete_topic(topic=self.topic_name)
self.subscription_client.delete_subscription(subscription=self.subscription_name)

self.logging.info("Deleted queue and associated subscription")

def send_message(self, serialized_message: str):
self.client.publish(self.topic_name, serialized_message.decode("utf-8"))
self.logging.info(f"Sent message to queue {self.name}")

# Receive messages through the 'pull' (sync) method.
def receive_message(self) -> str:
self.logging.info(f"Pulling a message from {self.name}")

response = self.subscription_client.pull(
subscription=self.subscription_name,
max_messages=1,
retry=retry.Retry(deadline=5),
)

if (len(response.received_messages) == 0):
self.logging.info("No messages to be received")
return

# Acknowledge the received message so it is not sent again.
received_message = response.received_messages[0]
self.subscription_client.acknowledge(
subscription=self.subscription_name,
ack_ids=[received_message.ack_id],
)
self.logging.info(f"Received a message from {self.name}")

return received_message.message.data