Skip to content
This repository has been archived by the owner on Nov 29, 2024. It is now read-only.

Commit

Permalink
Merge branch 'main' into tferns-adamw-application-store-email-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
tferns authored Nov 1, 2023
2 parents f61fbaa + 8fdf5ec commit 6d40b39
Show file tree
Hide file tree
Showing 9 changed files with 51 additions and 121 deletions.
6 changes: 4 additions & 2 deletions .env.development
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ AWS_SECRET_ACCESS_KEY=fsdlrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
AWS_BUCKET_NAME=fsd-bucket
AWS_ENDPOINT_OVERRIDE=http://localhost:4566
AWS_REGION=eu-west-2
AWS_SQS_QUEUE_NAME=fsd-queue
AWS_DLQ_QUEUE_NAME=fsd-dlq
AWS_SQS_IMPORT_APP_QUEUE_NAME=import-queue
AWS_DLQ_IMPORT_APP_QUEUE_NAME=import-dlq
AWS_DLQ_MAX_RECIEVE_COUNT=3
AWS_SQS_IMPORT_APP_PRIMARY_QUEUE_URL=http://localhost:4566/000000000000/import-queue
AWS_SQS_IMPORT_APP_SECONDARY_QUEUE_URL=http://localhost:4566/000000000000/import-queue-dlq
16 changes: 12 additions & 4 deletions api/routes/application/routes.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import time
from typing import Optional
from uuid import uuid4

from _helpers import get_blank_forms
from _helpers import order_applications
Expand Down Expand Up @@ -27,7 +28,7 @@
from external_services import get_account
from external_services import get_fund
from external_services import get_round
from external_services.aws import submit_message_to_queue
from external_services.aws import _SQS_CLIENT
from external_services.exceptions import NotificationError
from external_services.models.notification import Notification
from flask import current_app
Expand Down Expand Up @@ -184,9 +185,16 @@ def submit(self, application_id):
# assessment service to import the application
# (currently assessment is using a CRON timer to pick up messages,
# not a webhook for triggers)
submit_message_to_queue(
application_with_form_json,
application_attributes,

# TODO: (FS-3703) Revisit this part after AWS migration
# 'MessageGroupId' & 'MessageDeduplicationId' are mandatary parameters to be provided on PAAS,
# while they are not acceptable parameters on localstack queue
_SQS_CLIENT.submit_single_message(
queue_url=Config.AWS_SQS_IMPORT_APP_PRIMARY_QUEUE_URL,
message=application_with_form_json,
extra_attributes=application_attributes,
message_group_id="import_applications_group",
message_deduplication_id=str(uuid4()),
)

if should_send_email:
Expand Down
18 changes: 14 additions & 4 deletions api/routes/queues/routes.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# from config import Config
from uuid import uuid4

from config import Config
from db.queries import get_application
from external_services.aws import submit_message_to_queue
from external_services.aws import _SQS_CLIENT
from flask.views import MethodView

# from flask import request
Expand All @@ -22,9 +25,16 @@ def post_submitted_application_to_assessment(self, application_id=None):
# trigger the assessment service to import the application
# (currently assessment is using a CRON timer to pick up messages,
# not a webhook for triggers)
message_submitted_id = submit_message_to_queue(
application_with_form_json,
application_attributes,

# TODO: (FS-3703) Revisit this part after AWS migration
# 'MessageGroupId' & 'MessageDeduplicationId' are mandatary parameters to be provided on PAAS,
# while they are not acceptable parameters on localstack queue
message_submitted_id = _SQS_CLIENT.submit_single_message(
queue_url=Config.AWS_SQS_IMPORT_APP_PRIMARY_QUEUE_URL,
message=application_with_form_json,
extra_attributes=application_attributes,
message_group_id="import_applications_group",
message_deduplication_id=str(uuid4()),
)

return f"Message queued, message_id is: {message_submitted_id}.", 201
Expand Down
8 changes: 6 additions & 2 deletions config/envs/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,12 @@ class DefaultConfig:
)
AWS_BUCKET_NAME = os.environ.get("AWS_BUCKET_NAME")
AWS_REGION = AWS_SQS_REGION = os.environ.get("AWS_REGION")
AWS_SQS_IMPORT_APP_PRIMARY_QUEUE_URL = ""
AWS_SQS_IMPORT_APP_SECONDARY_QUEUE_URL = ""
AWS_SQS_IMPORT_APP_PRIMARY_QUEUE_URL = os.environ.get(
"AWS_SQS_IMPORT_APP_PRIMARY_QUEUE_URL"
)
AWS_SQS_IMPORT_APP_SECONDARY_QUEUE_URL = os.environ.get(
"AWS_SQS_IMPORT_APP_SECONDARY_QUEUE_URL"
)

# Account Store Endpoints
ACCOUNTS_ENDPOINT = "/accounts"
Expand Down
102 changes: 3 additions & 99 deletions external_services/aws.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
import json
from collections import namedtuple
from datetime import datetime
from os import getenv
from uuid import uuid4

import boto3
from botocore.exceptions import ClientError
from config import Config
from fsd_utils.services.aws import SQSClient

_KEY_PARTS = ("application_id", "form", "path", "component_id", "filename")

Expand All @@ -21,8 +18,7 @@
region_name=Config.AWS_REGION,
endpoint_url=getenv("AWS_ENDPOINT_OVERRIDE", None),
)
_SQS_CLIENT = boto3.client(
"sqs",
_SQS_CLIENT = SQSClient(
aws_access_key_id=Config.AWS_SQS_ACCESS_KEY_ID,
aws_secret_access_key=Config.AWS_SQS_SECRET_ACCESS_KEY,
region_name=Config.AWS_SQS_REGION,
Expand All @@ -34,8 +30,7 @@
region_name=Config.AWS_REGION,
endpoint_url=getenv("AWS_ENDPOINT_OVERRIDE", None),
)
_SQS_CLIENT = boto3.client(
"sqs",
_SQS_CLIENT = SQSClient(
region_name=Config.AWS_REGION,
endpoint_url=getenv("AWS_ENDPOINT_OVERRIDE", None),
)
Expand All @@ -55,94 +50,3 @@ def list_files_by_prefix(prefix: str) -> list[FileData]:
for key in [file["Key"] for file in contents]
if len(key_parts := key.split("/")) == len(_KEY_PARTS)
]


def get_queues(prefix=None):
"""
Gets a list of SQS queues. When a prefix is specified, only queues with names
that start with the prefix are returned.
:param prefix: The prefix used to restrict the list of returned queues.
:return: A list of Queue names.
"""
if prefix:
queues = _SQS_CLIENT.list_queues(QueueNamePrefix=prefix)["QueueUrls"]
else:
queues = _SQS_CLIENT.list_queues()["QueueUrls"]
if queues:
print(f"Got queues: {', '.join([q for q in queues])}")
queue_names = [url.split("/")[-1] for url in queues]
return queue_names
else:
print("No queues found.")
return []


def remove_queue(queue_url):
"""
Removes an SQS queue. When run against an AWS account, it can take up to
60 seconds before the queue is actually deleted.
:param queue: The queue to delete.
:return: None
"""
try:
_SQS_CLIENT.delete_queue(QueueUrl=queue_url)
print(f"Deleted queue with URL={queue_url}.")
except ClientError as error:
print(f"Couldn't delete queue with URL={queue_url}!")
raise error


def _get_queue_url(sqs_client, queue_name):
response = sqs_client.get_queue_url(
QueueName=queue_name,
)
return response["QueueUrl"]


def submit_message_to_queue(message, extra_attributes: dict = None):
try:
SQS_CUSTOM_ATTRIBUTES = {
"message_created_at": {
"StringValue": str(datetime.now()),
"DataType": "String",
},
}
# add extra message attributes (if provided)
if extra_attributes:
for key, value in extra_attributes.items():
SQS_CUSTOM_ATTRIBUTES[key] = value

queue_url = Config.AWS_SQS_IMPORT_APP_PRIMARY_QUEUE_URL or _get_queue_url(
_SQS_CLIENT,
getenv("AWS_SQS_QUEUE_NAME", "fsd-queue"),
)
print(f"Attempting to place message on queue '{queue_url}'.")

# TODO: Revisit this part after AWS migration
if "docker" in queue_url or "local" in queue_url: # if running on localstack
response = _SQS_CLIENT.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps(message),
MessageAttributes=SQS_CUSTOM_ATTRIBUTES,
)
else:
response = _SQS_CLIENT.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps(message),
MessageAttributes=SQS_CUSTOM_ATTRIBUTES,
MessageGroupId="import_applications_group",
MessageDeduplicationId=str(uuid4()),
)
message_id = response["MessageId"]
print(f"Message (id: {message_id}) submitted to queue: {queue_url}.")
return message_id
except Exception as e:
print(
"Error whilst staging onto queue"
f" '{queue_url}', message with"
f" attributes '{str(extra_attributes)}'."
f" Error : {str(e)}"
)
return str(e), 500, {"x-error": "Error"}
5 changes: 3 additions & 2 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ flipper-client==1.3.1
# via
# -r requirements.txt
# funding-service-design-utils
funding-service-design-utils==2.0.18
funding-service-design-utils==2.0.30
# via -r requirements.txt
gitdb==4.0.9
# via gitpython
Expand Down Expand Up @@ -230,6 +230,7 @@ markupsafe==2.1.1
# -r requirements.txt
# jinja2
# mako
# sentry-sdk
# werkzeug
marshmallow==3.19.0
# via
Expand Down Expand Up @@ -400,7 +401,7 @@ semver==2.13.0
# via
# -r requirements.txt
# prance
sentry-sdk[flask]==1.16.0
sentry-sdk[flask]==1.32.0
# via
# -r requirements.txt
# funding-service-design-utils
Expand Down
7 changes: 4 additions & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ flask-sqlalchemy==3.0.3
# funding-service-design-utils
flipper-client==1.3.1
# via funding-service-design-utils
funding-service-design-utils==2.0.18
# via -r requirements.in
funding-service-design-utils==2.0.30
# via -r requirements.txt
greenlet==2.0.2
# via sqlalchemy
gunicorn==20.1.0
Expand Down Expand Up @@ -131,6 +131,7 @@ markupsafe==2.1.1
# via
# jinja2
# mako
# sentry-sdk
# werkzeug
marshmallow==3.19.0
# via marshmallow-sqlalchemy
Expand Down Expand Up @@ -220,7 +221,7 @@ s3transfer==0.6.0
# via boto3
semver==2.13.0
# via prance
sentry-sdk[flask]==1.16.0
sentry-sdk[flask]==1.32.0
# via funding-service-design-utils
six==1.16.0
# via
Expand Down
6 changes: 3 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,8 @@ def mock_submit_message_to_queue(mocker, request):
function_calls_to_mock_marker.args[0]
if function_calls_to_mock_marker
else [
"api.routes.application.routes.submit_message_to_queue",
"api.routes.queues.routes.submit_message_to_queue",
"api.routes.application.routes._SQS_CLIENT.submit_single_message",
"api.routes.queues.routes._SQS_CLIENT.submit_single_message",
]
)

Expand All @@ -386,4 +386,4 @@ def mock_submit_message_to_queue(mocker, request):
if function_calls_to_mock_marker:
for mock in mocked_calls:
assert mock.called == True # noqa
assert len(mock.call_args[0]) == 2
assert len(mock.call_args[1]) == 5
4 changes: 2 additions & 2 deletions tests/test_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ def test_put_returns_400_on_submitted_application(


@pytest.mark.function_calls_to_mock(
["api.routes.application.routes.submit_message_to_queue"]
["api.routes.application.routes._SQS_CLIENT.submit_single_message"]
)
@pytest.mark.apps_to_insert([test_application_data[0]])
def test_successful_submitted_application(
Expand Down Expand Up @@ -611,7 +611,7 @@ def test_stage_unsubmitted_application_to_queue_fails(


@pytest.mark.function_calls_to_mock(
["api.routes.queues.routes.submit_message_to_queue"]
["api.routes.queues.routes._SQS_CLIENT.submit_single_message"]
)
@pytest.mark.apps_to_insert([test_application_data[0]])
def test_stage_submitted_application_to_queue_fails(
Expand Down

0 comments on commit 6d40b39

Please sign in to comment.