From 62b5b693e100683146eab95daeab1246f10c9afe Mon Sep 17 00:00:00 2001 From: RamuniN Date: Sat, 28 Oct 2023 01:32:50 +0100 Subject: [PATCH 1/3] refactored to use sqs funcs from fsd-utils --- .env.development | 6 +- _helpers/import_application.py | 16 ++-- config/envs/default.py | 22 +++-- config/envs/unit_testing.py | 4 +- requirements-dev.txt | 44 ++++++--- requirements.txt | 8 +- services/__init__.py | 3 - services/aws.py | 164 +-------------------------------- tests/test_sqs_helpers.py | 38 +------- 9 files changed, 73 insertions(+), 232 deletions(-) diff --git a/.env.development b/.env.development index 833a8785..18f3a605 100644 --- a/.env.development +++ b/.env.development @@ -10,6 +10,8 @@ AWS_ACCESS_KEY_ID=FSDIOSFODNN7EXAMPLE AWS_SECRET_ACCESS_KEY=fsdlrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY AWS_ENDPOINT_OVERRIDE=http://localhost:4566 AWS_REGION=eu-west-2 -AWS_SQS_QUEUE_NAME=fsd-queue -AWS_DLQ_QUEUE_NAME=fsd-dlq +AWS_SQS_IMPORT_APP_QUEUE_NAME=import-queue +AWS_DLQ_IMPORT_APP_QUEUE_NAME=import-dlq AWS_DLQ_MAX_RECIEVE_COUNT=3 +AWS_SQS_IMPORT_APP_PRIMARY_QUEUE_URL=http://localhost:4566/000000000000/import-queue +AWS_SQS_IMPORT_APP_SECONDARY_QUEUE_URL=http://localhost:4566/000000000000/import-queue-dlq diff --git a/_helpers/import_application.py b/_helpers/import_application.py index 78d7cebe..87394481 100644 --- a/_helpers/import_application.py +++ b/_helpers/import_application.py @@ -8,17 +8,18 @@ ) from db.queries import bulk_insert_application_record from flask import current_app -from services import _SQS_QUEUE_URL -from services import delete_messages -from services import receive_messages +from services import _SQS_CLIENT def import_applications_from_queue(): batch_size = Config.SQS_BATCH_SIZE visibility_time = Config.SQS_VISIBILITY_TIME wait_time = Config.SQS_WAIT_TIME - application_messages = receive_messages( - _SQS_QUEUE_URL, batch_size, visibility_time, wait_time + application_messages = _SQS_CLIENT.receive_messages( + Config.AWS_SQS_IMPORT_APP_PRIMARY_QUEUE_URL, + batch_size, + visibility_time, + wait_time, ) if not application_messages: @@ -61,7 +62,10 @@ def import_applications_from_queue(): ): reciept_handles_to_delete.append(message["ReceiptHandle"]) if reciept_handles_to_delete: - delete_messages(_SQS_QUEUE_URL, reciept_handles_to_delete) + _SQS_CLIENT.delete_messages( + Config.AWS_SQS_IMPORT_APP_PRIMARY_QUEUE_URL, + reciept_handles_to_delete, + ) for message in application_messages: receive_count = int( diff --git a/config/envs/default.py b/config/envs/default.py index a74b05fa..7583fe77 100644 --- a/config/envs/default.py +++ b/config/envs/default.py @@ -50,8 +50,10 @@ class DefaultConfig: # --------------- if "PRIMARY_QUEUE_URL" in environ: AWS_REGION = environ.get("AWS_REGION") - AWS_PRIMARY_QUEUE_URL = environ.get("PRIMARY_QUEUE_URL") - AWS_SECONDARY_QUEUE_URL = environ.get("DEAD_LETTER_QUEUE_URL") + AWS_SQS_IMPORT_APP_PRIMARY_QUEUE_URL = environ.get("PRIMARY_QUEUE_URL") + AWS_SQS_IMPORT_APP_SECONDARY_QUEUE_URL = environ.get( + "DEAD_LETTER_QUEUE_URL" + ) elif "VCAP_SERVICES" in environ: vcap_services = json.loads(environ["VCAP_SERVICES"]) if "aws-sqs-queue" in vcap_services: @@ -59,14 +61,22 @@ class DefaultConfig: AWS_REGION = sqs_credentials["aws_region"] AWS_ACCESS_KEY_ID = sqs_credentials["aws_access_key_id"] AWS_SECRET_ACCESS_KEY = sqs_credentials["aws_secret_access_key"] - AWS_PRIMARY_QUEUE_URL = sqs_credentials["primary_queue_url"] - AWS_SECONDARY_QUEUE_URL = sqs_credentials["secondary_queue_url"] + AWS_SQS_IMPORT_APP_PRIMARY_QUEUE_URL = sqs_credentials[ + "primary_queue_url" + ] + AWS_SQS_IMPORT_APP_SECONDARY_QUEUE_URL = sqs_credentials[ + "secondary_queue_url" + ] else: AWS_ACCESS_KEY_ID = environ.get("AWS_ACCESS_KEY_ID") AWS_SECRET_ACCESS_KEY = environ.get("AWS_SECRET_ACCESS_KEY") AWS_REGION = environ.get("AWS_REGION") - AWS_PRIMARY_QUEUE_URL = "" - AWS_SECONDARY_QUEUE_URL = "" + AWS_SQS_IMPORT_APP_PRIMARY_QUEUE_URL = environ.get( + "AWS_SQS_IMPORT_APP_PRIMARY_QUEUE_URL" + ) + AWS_SQS_IMPORT_APP_SECONDARY_QUEUE_URL = environ.get( + "AWS_SQS_IMPORT_APP_SECONDARY_QUEUE_URL" + ) AWS_DLQ_MAX_RECIEVE_COUNT = int( environ.get("AWS_DLQ_MAX_RECIEVE_COUNT", 3) ) diff --git a/config/envs/unit_testing.py b/config/envs/unit_testing.py index 9e695f12..f2a8db19 100644 --- a/config/envs/unit_testing.py +++ b/config/envs/unit_testing.py @@ -29,5 +29,5 @@ class UnitTestingConfig(DefaultConfig): AWS_ACCESS_KEY_ID = "test_access_id" AWS_SECRET_ACCESS_KEY = "test_secret_key" # pragma: allowlist secret AWS_REGION = "eu-west-2" - AWS_PRIMARY_QUEUE_URL = "test_primary_url" - AWS_SECONDARY_QUEUE_URL = "test_secondary_url" + AWS_SQS_IMPORT_APP_PRIMARY_QUEUE_URL = "test_primary_url" + AWS_SQS_IMPORT_APP_SECONDARY_QUEUE_URL = "test_secondary_url" diff --git a/requirements-dev.txt b/requirements-dev.txt index 8b011da3..254b3bf6 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -30,6 +30,10 @@ babel==2.11.0 # flask-babel bandit==1.7.4 # via -r requirements-dev.in +beautifulsoup4==4.12.2 + # via + # -r requirements.txt + # funding-service-design-utils black==22.10.0 # via -r requirements-dev.in blinker==1.5 @@ -81,13 +85,6 @@ clickclick==20.10.2 # via # -r requirements.txt # connexion -colorama==0.4.6 - # via - # -r requirements.txt - # bandit - # build - # click - # pytest colored==1.4.4 # via -r requirements-dev.in commonmark==0.9.1 @@ -112,6 +109,8 @@ deepdiff==6.2.3 # via -r requirements-dev.in distlib==0.3.6 # via virtualenv +exceptiongroup==1.1.3 + # via pytest filelock==3.8.0 # via virtualenv flask==2.2.3 @@ -153,13 +152,15 @@ flupy==1.2.0 # via # -r requirements.txt # alembic-utils -funding-service-design-utils==2.0.1 - # via -r requirements.txt +# TODO: update fsd-utils version after below branch is merged +git+https://github.com/communitiesuk/funding-service-design-utils.git@add-init-file-sqs +# funding-service-design-utils==2.0.29 +# # via -r requirements.txt gitdb==4.0.9 # via gitpython gitpython==3.1.29 # via bandit -greenlet==2.0.2 +greenlet==3.0.1 # via # -r requirements.txt # sqlalchemy @@ -229,6 +230,7 @@ markupsafe==2.1.1 # -r requirements.txt # jinja2 # mako + # sentry-sdk # werkzeug marshmallow==3.19.0 # via @@ -397,6 +399,10 @@ ruamel-yaml==0.17.21 # via # -r requirements.txt # prance +ruamel-yaml-clib==0.2.8 + # via + # -r requirements.txt + # ruamel-yaml s3transfer==0.6.0 # via # -r requirements.txt @@ -405,7 +411,7 @@ semver==2.13.0 # via # -r requirements.txt # prance -sentry-sdk[flask]==1.11.1 +sentry-sdk[flask]==1.31.0 # via # -r requirements.txt # funding-service-design-utils @@ -421,6 +427,10 @@ six==1.16.0 # thrift smmap==5.0.0 # via gitdb +soupsieve==2.5 + # via + # -r requirements.txt + # beautifulsoup4 sqlalchemy[mypy]==1.4.48 # via # -r requirements.txt @@ -451,6 +461,14 @@ thrift==0.16.0 # via # -r requirements.txt # flipper-client +tomli==2.0.1 + # via + # -r requirements.txt + # black + # build + # mypy + # pep517 + # pytest typing-extensions==4.4.0 # via # -r requirements.txt @@ -460,10 +478,6 @@ typing-extensions==4.4.0 # mypy # sqlalchemy-stubs # sqlalchemy2-stubs -tzdata==2023.3 - # via - # -r requirements.txt - # tzlocal tzlocal==5.0.1 # via # -r requirements.txt diff --git a/requirements.txt b/requirements.txt index 0ae3c25e..99b3265c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -86,8 +86,12 @@ flipper-client==1.3.1 # via funding-service-design-utils flupy==1.2.0 # via alembic-utils -funding-service-design-utils==2.0.26 - # via -r requirements.in +# TODO: update fsd-utils version after below branch is merged +git+https://github.com/communitiesuk/funding-service-design-utils.git@add-init-file-sqs +# funding-service-design-utils==2.0.29 +# # via -r requirements.txt +greenlet==3.0.1 + # via sqlalchemy gunicorn==20.1.0 # via funding-service-design-utils idna==3.4 diff --git a/services/__init__.py b/services/__init__.py index 8e3d8f4a..52d92e37 100644 --- a/services/__init__.py +++ b/services/__init__.py @@ -1,4 +1 @@ from .aws import _SQS_CLIENT # noqa -from .aws import _SQS_QUEUE_URL # noqa -from .aws import delete_messages # noqa -from .aws import receive_messages # noqa diff --git a/services/aws.py b/services/aws.py index 31944947..b9cceeb6 100644 --- a/services/aws.py +++ b/services/aws.py @@ -1,179 +1,21 @@ -from datetime import datetime from os import getenv -from uuid import uuid4 -import boto3 -from botocore.exceptions import ClientError from config import Config +from fsd_utils.services.aws import SQSClient if ( getenv("PRIMARY_QUEUE_URL", "Primary Queue URL Not Set") == "Primary Queue URL Not Set" ): - _SQS_CLIENT = boto3.client( - "sqs", + _SQS_CLIENT = SQSClient( aws_access_key_id=Config.AWS_ACCESS_KEY_ID, aws_secret_access_key=Config.AWS_SECRET_ACCESS_KEY, region_name=Config.AWS_REGION, endpoint_url=getenv("AWS_ENDPOINT_OVERRIDE", None), ) else: - _SQS_CLIENT = boto3.client( - "sqs", + _SQS_CLIENT = SQSClient( region_name=Config.AWS_REGION, endpoint_url=getenv("AWS_ENDPOINT_OVERRIDE", None), ) - -_SQS_QUEUE_URL = ( - Config.AWS_PRIMARY_QUEUE_URL - or _SQS_CLIENT.get_queue_url( - QueueName=getenv("AWS_SQS_QUEUE_NAME", "fsd-queue"), - )["QueueUrl"] -) - - -def pack_message(msg_body): - return { - "body": msg_body, - "attributes": { - "id": {"StringValue": str(uuid4()), "DataType": "String"}, - "datetime": { - "StringValue": str(datetime.now()), - "DataType": "String", - }, - }, - } - - -def unpack_message(msg): - return ( - msg["MessageAttributes"]["id"]["StringValue"], - msg["Body"], - msg["MessageAttributes"]["datetime"]["StringValue"], - ) - - -def submit_message(queue_url, messages, DelaySeconds=1): - """ - Send a batch of messages in a single request to an SQS queue. - This request may return overall success even when some messages were not sent. - The caller must inspect the Successful and Failed lists in the response and - resend any failed messages. - - :param queue_url: SQS Queue url. - :param queue: The queue to receive the messages. - :param messages: The messages to send to the queue. These are simplified to - contain only the message body and attributes. - :return: The response from SQS that contains the list of successful and failed - messages. - """ - try: - entries = [ - { - "Id": str(ind), - "MessageBody": msg["body"], - "MessageAttributes": msg["attributes"], - "DelaySeconds": DelaySeconds, - } - for ind, msg in enumerate(messages) - ] - response = _SQS_CLIENT.send_message_batch( - QueueUrl=queue_url, - Entries=entries, - ) - if "Successful" in response: - for msg_meta in response["Successful"]: - print( - f"Message sent to the queue {_SQS_QUEUE_URL}, MessageId: {msg_meta['MessageId']}" - ) - if "Failed" in response: - for msg_meta in response["Failed"]: - print( - f"Failed to send messages to queue: {_SQS_QUEUE_URL}, " - f"attributes {messages[int(msg_meta['Id'])]['attributes']}" - ) - except ClientError as error: - print(f"Send messages failed to queue: {_SQS_QUEUE_URL}") - raise error - else: - return response - - -def receive_messages(queue_url, max_number, visibility_time=1, wait_time=1): - """ - Receive a batch of messages in a single request from an SQS queue. - - :param queue_url: SQS Queue url - :param max_number: The maximum number of messages to receive. The actual number - of messages received might be less. - :param visibility_time: The maximum time for message to temporarily invisible to other receivers. - This gives the initial receiver a chance to process the message. If the receiver - successfully processes and deletes the message within the visibility timeout, - the message is removed from the queue. - :param wait_time: The maximum time to wait (in seconds) before returning. When - this number is greater than zero, long polling is used. This - can result in reduced costs and fewer false empty responses. - :return: The list of Message objects received. These each contain the body - of the message and metadata and custom attributes. - """ - try: - response = _SQS_CLIENT.receive_message( - QueueUrl=queue_url, - AttributeNames=["SentTimestamp", "ApproximateReceiveCount"], - MessageAttributeNames=["All"], - MaxNumberOfMessages=max_number, - VisibilityTimeout=visibility_time, - WaitTimeSeconds=wait_time, - ) - if "Messages" in response.keys(): - messages = response["Messages"] - elif response["ResponseMetadata"]["HTTPStatusCode"] == 200: - print(f"No more messages available in queue: {_SQS_QUEUE_URL}") - return [] - - for msg in messages: - print( - f"Received message ID: {msg['MessageId']}, Attributes: {msg['MessageAttributes']}" - ) - except Exception as error: - print( - f"Couldn't receive messages from queue: {_SQS_QUEUE_URL} Error: {error}" - ) - raise error - else: - return messages - - -def delete_messages(queue_url, message_receipt_handles): - """ - Delete a batch of messages from a queue in a single request. - - :param queue_url: SQS Queue url - :param message_receipt_handles: The list of messages handles to delete. - :return: The response from SQS that contains the list of successful and failed - message deletions. - """ - try: - entries = [ - {"Id": str(ind), "ReceiptHandle": receipt_handle} - for ind, receipt_handle in enumerate(message_receipt_handles) - ] - response = _SQS_CLIENT.delete_message_batch( - QueueUrl=queue_url, Entries=entries - ) - - if "Successful" in response: - for msg_meta in response["Successful"]: - print( - f"Deleted {message_receipt_handles[int(msg_meta['Id'])]}" - ) - if "Failed" in response: - for msg_meta in response["Failed"]: - print( - f"Could not delete {message_receipt_handles[int(msg_meta['Id'])]}" - ) - except ClientError: - print(f"Couldn't delete message from queue {_SQS_QUEUE_URL}") - else: - return response diff --git a/tests/test_sqs_helpers.py b/tests/test_sqs_helpers.py index c02065b4..0b57dfd3 100644 --- a/tests/test_sqs_helpers.py +++ b/tests/test_sqs_helpers.py @@ -2,12 +2,9 @@ import pytest from _helpers.import_application import import_applications_from_queue -from config.envs.unit_testing import UnitTestingConfig from config.mappings.assessment_mapping_fund_round import ( fund_round_mapping_config, ) -from services import delete_messages -from services import receive_messages from tests._helpers import row_data @@ -34,8 +31,8 @@ def mock_sqs_recieve_message(request, mocker): ) mocker.patch( - "services._SQS_CLIENT.receive_message", - return_value={"Messages": messages}, + "_helpers.import_application._SQS_CLIENT.receive_messages", + return_value=messages, ) yield messages @@ -44,7 +41,7 @@ def mock_sqs_recieve_message(request, mocker): def mock_sqs_delete_message(request, mocker): appcount = request.node.get_closest_marker("appcount").args[0] mocker.patch( - "services._SQS_CLIENT.delete_message_batch", + "_helpers.import_application._SQS_CLIENT.delete_messages", return_value={ "Successful": [{"Id": str(count)} for count in range(appcount)], "Failed": [], @@ -74,35 +71,6 @@ def mock_bulk_insert_application_records(mocker, mock_sqs_recieve_message): class TestSQSFunctions(object): - @pytest.mark.fundround("NSTFR2") - @pytest.mark.appcount(3) - def test_receive_messages(self, request, mock_sqs_recieve_message): - appcount = request.node.get_closest_marker("appcount").args[0] - - # Call the function - messages = receive_messages( - UnitTestingConfig.AWS_PRIMARY_QUEUE_URL, max_number=2 - ) - - # Assertions - assert len(messages) == appcount - assert messages[0]["MessageId"] == "1" - - @pytest.mark.appcount(3) - def test_delete_messages(self, request, mock_sqs_delete_message): - appcount = request.node.get_closest_marker("appcount").args[0] - - # Call the function - receipt_handles = [ - "receipt_handle_" + str(count) for count in range(appcount) - ] - response = delete_messages( - UnitTestingConfig.AWS_PRIMARY_QUEUE_URL, receipt_handles - ) - - # Assertions - assert len(response["Successful"]) == appcount - @pytest.mark.fundround("NSTFR2") @pytest.mark.appcount(3) def test_import_application_queue( From 46790bd8c65747e6aea19b0adaa8affd0f51501f Mon Sep 17 00:00:00 2001 From: RamuniN Date: Mon, 30 Oct 2023 12:06:12 +0000 Subject: [PATCH 2/3] bump utils version --- requirements-dev.txt | 6 ++---- requirements.txt | 6 ++---- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/requirements-dev.txt b/requirements-dev.txt index 254b3bf6..e6a6c1f2 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -152,10 +152,8 @@ flupy==1.2.0 # via # -r requirements.txt # alembic-utils -# TODO: update fsd-utils version after below branch is merged -git+https://github.com/communitiesuk/funding-service-design-utils.git@add-init-file-sqs -# funding-service-design-utils==2.0.29 -# # via -r requirements.txt +funding-service-design-utils==2.0.30 + # via -r requirements.txt gitdb==4.0.9 # via gitpython gitpython==3.1.29 diff --git a/requirements.txt b/requirements.txt index 99b3265c..84421f85 100644 --- a/requirements.txt +++ b/requirements.txt @@ -86,10 +86,8 @@ flipper-client==1.3.1 # via funding-service-design-utils flupy==1.2.0 # via alembic-utils -# TODO: update fsd-utils version after below branch is merged -git+https://github.com/communitiesuk/funding-service-design-utils.git@add-init-file-sqs -# funding-service-design-utils==2.0.29 -# # via -r requirements.txt +funding-service-design-utils==2.0.30 + # via -r requirements.txt greenlet==3.0.1 # via sqlalchemy gunicorn==20.1.0 From 7646765f00bfe5a477e82e419f4ec7e70aacbf2c Mon Sep 17 00:00:00 2001 From: RamuniN Date: Mon, 30 Oct 2023 12:34:18 +0000 Subject: [PATCH 3/3] renamed sqs tests --- ...st_sqs_helpers.py => test_import_queue.py} | 28 +++++++++---------- 1 file changed, 13 insertions(+), 15 deletions(-) rename tests/{test_sqs_helpers.py => test_import_queue.py} (81%) diff --git a/tests/test_sqs_helpers.py b/tests/test_import_queue.py similarity index 81% rename from tests/test_sqs_helpers.py rename to tests/test_import_queue.py index 0b57dfd3..11c5cd62 100644 --- a/tests/test_sqs_helpers.py +++ b/tests/test_import_queue.py @@ -70,20 +70,18 @@ def mock_bulk_insert_application_records(mocker, mock_sqs_recieve_message): yield -class TestSQSFunctions(object): - @pytest.mark.fundround("NSTFR2") - @pytest.mark.appcount(3) - def test_import_application_queue( - self, - request, - mock_sqs_recieve_message, - mock_sqs_delete_message, - mock_bulk_insert_application_records, - ): - appcount = request.node.get_closest_marker("appcount").args[0] +@pytest.mark.fundround("NSTFR2") +@pytest.mark.appcount(3) +def test_import_application_queue( + request, + mock_sqs_recieve_message, + mock_sqs_delete_message, + mock_bulk_insert_application_records, +): + appcount = request.node.get_closest_marker("appcount").args[0] - # Call the function - response = import_applications_from_queue() + # Call the function + response = import_applications_from_queue() - # Assertions - assert len(response) == appcount + # Assertions + assert len(response) == appcount