From c0b1fe8c32eac8098078b92376008dfed29eb685 Mon Sep 17 00:00:00 2001 From: p3rf Team Date: Wed, 4 Aug 2021 12:44:01 -0700 Subject: [PATCH] Azure Service Bus Resource Class. PiperOrigin-RevId: 388760894 --- perfkitbenchmarker/messaging_service_util.py | 4 +- .../providers/azure/azure_service_bus.py | 207 +++++++++++++ tests/messaging_service_util_test.py | 7 + .../providers/azure/azure_service_bus_test.py | 293 ++++++++++++++++++ 4 files changed, 510 insertions(+), 1 deletion(-) create mode 100644 perfkitbenchmarker/providers/azure/azure_service_bus.py create mode 100644 tests/providers/azure/azure_service_bus_test.py diff --git a/perfkitbenchmarker/messaging_service_util.py b/perfkitbenchmarker/messaging_service_util.py index be3b709303..66eccab2ae 100644 --- a/perfkitbenchmarker/messaging_service_util.py +++ b/perfkitbenchmarker/messaging_service_util.py @@ -5,6 +5,7 @@ from perfkitbenchmarker import linux_virtual_machine from perfkitbenchmarker import messaging_service from perfkitbenchmarker.providers.aws import aws_sqs +from perfkitbenchmarker.providers.azure import azure_service_bus from perfkitbenchmarker.providers.gcp import gcp_pubsub @@ -19,5 +20,6 @@ def get_instance( instance = aws_sqs.AwsSqs(client) return instance elif cloud == 'Azure': - raise NotImplementedError + instance = azure_service_bus.AzureServiceBus(client) + return instance raise NotImplementedError diff --git a/perfkitbenchmarker/providers/azure/azure_service_bus.py b/perfkitbenchmarker/providers/azure/azure_service_bus.py new file mode 100644 index 0000000000..928192b0c3 --- /dev/null +++ b/perfkitbenchmarker/providers/azure/azure_service_bus.py @@ -0,0 +1,207 @@ +"""Azure Service Bus interface for resources. + +This class handles resource creation/cleanup for messaging service benchmark +on Azure Service Bus. +https://docs.microsoft.com/en-us/azure/service-bus-messaging/ +""" + +import json +import logging +import os +import time +from typing import Any, Dict + +from absl import flags +from perfkitbenchmarker import errors +from perfkitbenchmarker import virtual_machine +from perfkitbenchmarker import vm_util +from perfkitbenchmarker.messaging_service import MessagingService +from perfkitbenchmarker.messaging_service import SLEEP_TIME +from perfkitbenchmarker.messaging_service import TIMEOUT +from perfkitbenchmarker.providers import azure +from perfkitbenchmarker.providers.azure import azure_network + +MESSAGING_SERVICE_DATA_DIR = 'messaging_service' + +FLAGS = flags.FLAGS + + +class AzureServiceBus(MessagingService): + """Azure Service Bus Interface Class.""" + + def __init__(self, + client: virtual_machine.BaseVirtualMachine): + super().__init__(client) + self.location = FLAGS.zones[0] + self.topic_name = 'pkb-topic-{0}'.format(FLAGS.run_uri) + self.subscription_name = 'pkb-subscription-{0}'.format(FLAGS.run_uri) + self.namespace_name = 'pkb-namespace-{0}'.format(FLAGS.run_uri) + self.resource_group = azure_network.GetResourceGroup() + + def _create_topic(self): + """Creates Service Bus topic.""" + cmd = [ + azure.AZURE_PATH, + 'servicebus', + 'topic', + 'create', + '--name', self.topic_name, + '--namespace-name', self.namespace_name + ] + self.resource_group.args + vm_util.IssueCommand(cmd) + + def _topic_exists(self) -> bool: + """Checks whether Service Bus topic already exists.""" + cmd = [ + azure.AZURE_PATH, 'servicebus', 'topic', 'show', '--name', + self.topic_name, + '--namespace-name', self.namespace_name + ]+ self.resource_group.args + _, _, retcode = vm_util.IssueCommand( + cmd, raise_on_failure=False) + if retcode != 0: + return False + return True + + def _delete_topic(self): + """Handle Service Bus topic deletion.""" + cmd = [ + azure.AZURE_PATH, 'servicebus', 'topic', 'delete', '--name', + self.topic_name, + '--namespace-name', self.namespace_name + ] + self.resource_group.args + vm_util.IssueCommand(cmd) + + def _create_subscription(self): + """Creates Service Bus subscription.""" + cmd = [ + azure.AZURE_PATH, 'servicebus', 'topic', 'subscription', 'create', + '--name', self.subscription_name, '--topic-name', self.topic_name, + '--namespace-name', self.namespace_name + ] + self.resource_group.args + vm_util.IssueCommand(cmd) + + def _subscription_exists(self) -> bool: + """Checks whether Service Bus subscription already exists.""" + cmd = [ + azure.AZURE_PATH, 'servicebus', 'topic', 'subscription', 'show', + '--name', self.subscription_name, '--topic-name', self.topic_name, + '--namespace-name', self.namespace_name + ] + self.resource_group.args + _, _, retcode = vm_util.IssueCommand( + cmd, raise_on_failure=False) + if retcode != 0: + return False + return True + + def _delete_subscription(self): + """Handle Service Bus subscription deletion.""" + cmd = [ + azure.AZURE_PATH, 'servicebus', 'topic', 'subscription', 'delete', + '--name', self.subscription_name, '--topic-name', self.topic_name, + '--namespace-name', self.namespace_name + ] + self.resource_group.args + vm_util.IssueCommand(cmd) + + def _create_namespace(self): + """Creates an Azure Service Bus Namespace.""" + cmd = [ + azure.AZURE_PATH, 'servicebus', 'namespace', 'create', '--name', + self.namespace_name, '--location', self.location + ] + self.resource_group.args + vm_util.IssueCommand(cmd) + + def _namespace_exists(self) -> bool: + """Checks if our Service Bus Namespace exists.""" + cmd = [ + azure.AZURE_PATH, 'servicebus', 'namespace', 'show', '--name', + self.namespace_name + ] + self.resource_group.args + _, _, retcode = vm_util.IssueCommand(cmd, raise_on_failure=False) + if retcode != 0: + return False + return True + + def _delete_namespace(self): + """Deletes the Azure Service Bus namespace.""" + cmd = [ + azure.AZURE_PATH, 'servicebus', 'namespace', 'delete', '--name', + self.namespace_name + ] + self.resource_group.args + vm_util.IssueCommand(cmd) + + def _get_primary_connection_string(self): + """Gets Azure Service Bus Namespace connection string.""" + cmd = [ + azure.AZURE_PATH, 'servicebus', 'namespace', 'authorization-rule', + 'keys', 'list', '--name=RootManageSharedAccessKey', '--namespace-name', + self.namespace_name, '--query=primaryConnectionString', '-o=tsv' + ] + self.resource_group.args + output, stderror, retcode = vm_util.IssueCommand( + cmd, raise_on_failure=False) + if retcode != 0: + logging.warning( + 'Failed to get Service Bus Namespace connection string! %s', stderror) + return output.strip() + + def create_resource(self, create_function, exists_function): + create_function() + timeout = time.time() + TIMEOUT + + while not exists_function(): + if time.time() > timeout: + raise errors.Benchmarks.PrepareException( + 'Timeout when creating resource.') + time.sleep(SLEEP_TIME) + + def provision_resources(self): + """Handles provision of resources needed for Azure Service Bus benchmark.""" + + self.create_resource(self._create_namespace, self._namespace_exists) + self.create_resource(self._create_topic, self._topic_exists) + self.create_resource(self._create_subscription, self._subscription_exists) + + def prepare(self): + # Install/uploads common modules/files + super().prepare() + + # Install/uploads Azure specific modules/files. + self.client.RemoteCommand( + 'sudo pip3 install azure-servicebus', + ignore_failure=False) + self.client.PushDataFile(os.path.join( + MESSAGING_SERVICE_DATA_DIR, + 'azure_service_bus_client.py')) + + # Create resources on Azure + self.provision_resources() + + def run(self, benchmark_scenario: str, number_of_messages: str, + message_size: str) -> Dict[str, Any]: + connection_str = self._get_primary_connection_string() + command = (f'python3 -m azure_service_bus_client ' + f'--topic_name={self.topic_name} ' + f'--subscription_name={self.subscription_name} ' + f'--benchmark_scenario={benchmark_scenario} ' + f'--number_of_messages={number_of_messages} ' + f'--message_size={message_size} ' + f'--connection_str="{connection_str}" ') + results = self.client.RemoteCommand(command) + results = json.loads(results[0]) + return results + + def delete_resource(self, delete_function, exists_function): + delete_function() + timeout = time.time() + TIMEOUT + + while exists_function(): + if time.time() > timeout: + raise errors.Resource.CleanupError( + 'Timeout when deleting resource.') + time.sleep(SLEEP_TIME) + + def cleanup(self): + self.delete_resource(self._delete_subscription, self._subscription_exists) + self.delete_resource(self._delete_topic, self._topic_exists) + self.delete_resource(self._delete_namespace, self._namespace_exists) + diff --git a/tests/messaging_service_util_test.py b/tests/messaging_service_util_test.py index 5f85b487d7..29fa532d63 100644 --- a/tests/messaging_service_util_test.py +++ b/tests/messaging_service_util_test.py @@ -20,6 +20,13 @@ def testGetInstanceAWS(self, aws_instance): messaging_service_util.get_instance(mock_client, 'AWS') aws_instance.assert_called_with(mock_client) + @mock.patch( + 'perfkitbenchmarker.providers.azure.azure_service_bus.AzureServiceBus') + def testGetInstanceAzure(self, azure_instance): + mock_client = 'mock_client' + messaging_service_util.get_instance(mock_client, 'Azure') + azure_instance.assert_called_with(mock_client) + if __name__ == '__main__': unittest.main() diff --git a/tests/providers/azure/azure_service_bus_test.py b/tests/providers/azure/azure_service_bus_test.py new file mode 100644 index 0000000000..aa252ebc9f --- /dev/null +++ b/tests/providers/azure/azure_service_bus_test.py @@ -0,0 +1,293 @@ +"""Tests for azure_service_bus.""" +import os +import unittest + +from absl import flags +import mock +from perfkitbenchmarker import errors +from perfkitbenchmarker import vm_util +from perfkitbenchmarker.providers.azure import azure_network +from perfkitbenchmarker.providers.azure import azure_service_bus +from tests import pkb_common_test_case + +_REGION = 'eastus' +BENCHMARK_SCENARIO = 'pull_latency' +NUMBER_OF_MESSAGES = 10 +MESSAGE_SIZE = 10 +MESSAGING_SERVICE_DATA_DIR = 'messaging_service' + +FLAGS = flags.FLAGS + + +class AzureServiceBusTest(pkb_common_test_case.PkbCommonTestCase): + + @mock.patch.object(azure_network, 'GetResourceGroup') + def setUp(self, resource_group_mock): + super().setUp() + FLAGS.run_uri = 'uri' + FLAGS.zones = [_REGION] + resource_group_mock.return_value.args = ['mocked_args'] + self.client = mock.Mock() + self.servicebus = azure_service_bus.AzureServiceBus(self.client) + + def _MockIssueCommand(self, return_value): + return self.enter_context(mock.patch.object( + vm_util, 'IssueCommand', return_value=return_value)) + + def testCreateTopic(self): + # Don't actually issue a command. + return_value = [None, None, 0] + cmd = self._MockIssueCommand(return_value) + + self.servicebus._create_topic() + cmd = ' '.join(cmd.call_args[0][0]) + self.assertIn( + 'servicebus topic create --name ' + self.servicebus.topic_name + + ' --namespace-name ' + self.servicebus.namespace_name, cmd) + + def testTopicExists(self): + # Don't actually issue a command. + return_value = [None, None, 0] + self._MockIssueCommand(return_value) + + topic = self.servicebus._topic_exists() + self.assertTrue(topic) + + def testNotFoundTopic(self): + # Don't actually issue a command. + return_value = ['', '', 1] + self._MockIssueCommand(return_value) + + topic = self.servicebus._topic_exists() + self.assertFalse(topic) + + def testDeleteTopic(self): + # Don't actually issue a command. + return_value = [None, None, 0] + cmd = self._MockIssueCommand(return_value) + + self.servicebus._delete_topic() + cmd = ' '.join(cmd.call_args[0][0]) + self.assertIn( + 'servicebus topic delete --name ' + self.servicebus.topic_name + + ' --namespace-name ' + self.servicebus.namespace_name, cmd) + + def testCreateSubscription(self): + # Don't actually issue a command. + return_value = [None, None, 0] + cmd = self._MockIssueCommand(return_value) + + self.servicebus._create_subscription() + cmd = ' '.join(cmd.call_args[0][0]) + self.assertIn( + 'servicebus topic subscription create --name ' + + self.servicebus.subscription_name + ' --topic-name ' + + self.servicebus.topic_name + ' --namespace-name ' + + self.servicebus.namespace_name, cmd) + + def testSubscriptionExists(self): + # Don't actually issue a command. + return_value = [None, None, 0] + self._MockIssueCommand(return_value) + + subscription = self.servicebus._subscription_exists() + self.assertTrue(subscription) + + def testNotFoundSubscription(self): + # Don't actually issue a command. + return_value = ['', '', 1] + self._MockIssueCommand(return_value) + + subscription = self.servicebus._subscription_exists() + self.assertFalse(subscription) + + def testDeleteSubscription(self): + # Don't actually issue a command. + return_value = [None, None, 0] + cmd = self._MockIssueCommand(return_value) + + self.servicebus._delete_subscription() + cmd = ' '.join(cmd.call_args[0][0]) + self.assertIn( + 'servicebus topic subscription delete --name ' + + self.servicebus.subscription_name + ' --topic-name ' + + self.servicebus.topic_name + ' --namespace-name ' + + self.servicebus.namespace_name, cmd) + + def testCreateNamespace(self): + # Don't actually issue a command. + return_value = [None, None, 0] + cmd = self._MockIssueCommand(return_value) + + self.servicebus._create_namespace() + cmd = ' '.join(cmd.call_args[0][0]) + self.assertIn( + 'servicebus namespace create --name ' + self.servicebus.namespace_name + + ' --location ' + self.servicebus.location, cmd) + + def testNamespaceExists(self): + # Don't actually issue a command. + return_value = [None, None, 0] + self._MockIssueCommand(return_value) + + namespace = self.servicebus._namespace_exists() + self.assertTrue(namespace) + + def testNamespaceDoesntExist(self): + # Don't actually issue a command. + return_value = ['', '', 1] + self._MockIssueCommand(return_value) + + namespace = self.servicebus._namespace_exists() + self.assertFalse(namespace) + + def testDeleteNamespace(self): + # Don't actually issue a command. + return_value = [None, None, 0] + cmd = self._MockIssueCommand(return_value) + + self.servicebus._delete_namespace() + cmd = ' '.join(cmd.call_args[0][0]) + self.assertIn( + 'servicebus namespace delete --name ' + + self.servicebus.namespace_name, cmd) + + def testGetConnectionString(self): + # Don't actually issue a command. + return_value = ['', None, 0] + cmd = self._MockIssueCommand(return_value) + + self.servicebus._get_primary_connection_string() + cmd = ' '.join(cmd.call_args[0][0]) + self.assertIn( + 'servicebus namespace authorization-rule keys list ' + + '--name=RootManageSharedAccessKey --namespace-name ' + + self.servicebus.namespace_name + + ' --query=primaryConnectionString -o=tsv', cmd) + + @mock.patch.object(azure_service_bus.AzureServiceBus, '_create_namespace') + @mock.patch.object( + azure_service_bus.AzureServiceBus, + '_namespace_exists', + side_effect=[False, True]) + @mock.patch.object(azure_service_bus.AzureServiceBus, '_create_subscription') + @mock.patch.object( + azure_service_bus.AzureServiceBus, + '_subscription_exists', + side_effect=[False, True]) + @mock.patch.object(azure_service_bus.AzureServiceBus, '_create_topic') + @mock.patch.object( + azure_service_bus.AzureServiceBus, + '_topic_exists', + side_effect=[False, True]) + def testProvisionResources(self, topic_exists_mock, create_topic_mock, + subscription_exists_mock, + create_subscription_mock, + namespace_exists_mock, + create_namespace_mock): + self.servicebus.provision_resources() + + self.assertEqual(create_namespace_mock.call_count, 1) + self.assertEqual(namespace_exists_mock.call_count, 2) + self.assertEqual(create_subscription_mock.call_count, 1) + self.assertEqual(subscription_exists_mock.call_count, 2) + self.assertEqual(create_topic_mock.call_count, 1) + self.assertEqual(topic_exists_mock.call_count, 2) + + @mock.patch.object(azure_service_bus.AzureServiceBus, '_create_namespace') + @mock.patch.object( + azure_service_bus.AzureServiceBus, + '_namespace_exists', + return_value=False) + @mock.patch.object(azure_service_bus.AzureServiceBus, '_create_subscription') + @mock.patch.object( + azure_service_bus.AzureServiceBus, + '_subscription_exists', + return_value=False) + @mock.patch.object(azure_service_bus.AzureServiceBus, '_create_topic') + @mock.patch.object( + azure_service_bus.AzureServiceBus, '_topic_exists', return_value=False) + def testProvisionResourcesException( + self, topic_exists_mock, create_topic_mock, subscription_exists_mock, + create_subscription_mock, create_namespace_mock, namespace_exists_mock): + self.assertRaises(errors.Benchmarks.PrepareException, + self.servicebus.provision_resources) + + @mock.patch.object(azure_service_bus.AzureServiceBus, 'provision_resources') + def testPrepare(self, provision_mock): + return_value = [None, None, 0] + self._MockIssueCommand(return_value) + + sdk_cmd = ('sudo pip3 install azure-servicebus') + datafile_path = os.path.join(MESSAGING_SERVICE_DATA_DIR, + 'azure_service_bus_client.py') + + self.servicebus.prepare() + self.client.RemoteCommand.assert_called_with(sdk_cmd, ignore_failure=False) + self.client.PushDataFile.assert_called_with(datafile_path) + provision_mock.assert_called() + + @mock.patch.object( + azure_service_bus.AzureServiceBus, + '_get_primary_connection_string', + return_value='mocked_string') + def testRun(self, get_connection_string_mock): + + return_value = ['{"mock1": 1}', None] + self.client.RemoteCommand.return_value = return_value + remote_run_cmd = ( + f'python3 -m azure_service_bus_client ' + f'--topic_name={self.servicebus.topic_name} ' + f'--subscription_name={self.servicebus.subscription_name} ' + f'--benchmark_scenario={BENCHMARK_SCENARIO} ' + f'--number_of_messages={NUMBER_OF_MESSAGES} ' + f'--message_size={MESSAGE_SIZE} ' + f'--connection_str="mocked_string" ') + + self.servicebus.run(BENCHMARK_SCENARIO, NUMBER_OF_MESSAGES, MESSAGE_SIZE) + self.client.RemoteCommand.assert_called_with(remote_run_cmd) + + @mock.patch.object(azure_service_bus.AzureServiceBus, '_delete_namespace') + @mock.patch.object( + azure_service_bus.AzureServiceBus, + '_namespace_exists', + side_effect=[True, False]) + @mock.patch.object(azure_service_bus.AzureServiceBus, '_delete_subscription') + @mock.patch.object( + azure_service_bus.AzureServiceBus, + '_subscription_exists', + side_effect=[True, False]) + @mock.patch.object(azure_service_bus.AzureServiceBus, '_delete_topic') + @mock.patch.object( + azure_service_bus.AzureServiceBus, + '_topic_exists', + side_effect=[True, False]) + def testCleanup(self, topic_exists_mock, delete_topic_mock, + subscription_exists_mock, delete_subscription_mock, + namespace_exists_mock, delete_namespace_mock): + self.servicebus.cleanup() + self.assertEqual(delete_namespace_mock.call_count, 1) + self.assertEqual(namespace_exists_mock.call_count, 2) + self.assertEqual(delete_subscription_mock.call_count, 1) + self.assertEqual(subscription_exists_mock.call_count, 2) + self.assertEqual(delete_topic_mock.call_count, 1) + self.assertEqual(topic_exists_mock.call_count, 2) + + @mock.patch.object(azure_service_bus.AzureServiceBus, '_delete_namespace') + @mock.patch.object( + azure_service_bus.AzureServiceBus, '_namespace_exists', return_value=True) + @mock.patch.object(azure_service_bus.AzureServiceBus, '_delete_subscription') + @mock.patch.object( + azure_service_bus.AzureServiceBus, + '_subscription_exists', + return_value=True) + @mock.patch.object(azure_service_bus.AzureServiceBus, '_delete_topic') + @mock.patch.object( + azure_service_bus.AzureServiceBus, '_topic_exists', return_value=True) + def testCleanupException(self, topic_exists_mock, delete_topic_mock, + subscription_exists_mock, delete_subscription_mock, + namespace_exists_mock, delete_namespace_mock): + self.assertRaises(errors.Resource.CleanupError, self.servicebus.cleanup) + +if __name__ == '__main__': + unittest.main()