Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create consignment indexing lambda for opensearch #688

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion data_management/opensearch_indexer/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ RUN pip install -r requirements.txt

COPY opensearch_indexer/ ${LAMBDA_TASK_ROOT}/opensearch_indexer

CMD [ "opensearch_indexer.lambda_function.lambda_handler" ]
CMD [ "opensearch_indexer.index_consignment.lambda_function.lambda_handler" ]
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import json
import logging
from typing import Any, Dict
from urllib.parse import quote_plus

import boto3
from requests_aws4auth import AWS4Auth

logger = logging.getLogger()
logger.setLevel(logging.INFO)


def get_s3_file(bucket_name: str, object_key: str) -> bytes:
s3 = boto3.client("s3")
s3_file_object = s3.get_object(Bucket=bucket_name, Key=object_key)
return s3_file_object["Body"].read()


def get_secret_data(secret_id: str) -> Dict[str, Any]:
sm = boto3.client("secretsmanager")
secret_response = sm.get_secret_value(SecretId=secret_id)
return json.loads(secret_response["SecretString"])


def _build_db_url(secret_string: Dict[str, Any]) -> str:
return (
"postgresql+pg8000://"
f'{secret_string["DB_USER"]}:{quote_plus(secret_string["DB_PASSWORD"])}'
f'@{secret_string["DB_HOST"]}:{secret_string["DB_PORT"]}/{secret_string["DB_NAME"]}'
)


def _get_opensearch_auth(secret_string: Dict[str, Any]) -> AWS4Auth:
sts_client = boto3.client("sts")
assumed_role = sts_client.assume_role(
RoleArn=secret_string["OPEN_SEARCH_MASTER_ROLE_ARN"],
RoleSessionName="LambdaOpenSearchSession",
)
logger.info("Extract temporary credentials to access OpenSearch")
credentials = assumed_role["Credentials"]
open_search_http_auth = AWS4Auth(
credentials["AccessKeyId"],
credentials["SecretAccessKey"],
secret_string["AWS_REGION"],
"es",
session_token=credentials["SessionToken"],
)

return open_search_http_auth
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
import json
import logging
from typing import Dict, List, Optional, Tuple, Union

import pg8000
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
from sqlalchemy import create_engine, text
from sqlalchemy.orm import declarative_base, sessionmaker

from ..aws_helpers import (
_build_db_url,
_get_opensearch_auth,
get_s3_file,
get_secret_data,
)
from ..text_extraction import add_text_content

logger = logging.getLogger()
logger.setLevel(logging.INFO)


def bulk_index_consignment_from_aws(
consignment_reference: str, bucket_name: str, secret_id: str
) -> None:
"""
Retrieve credentials and host information from AWS Secrets Manager, fetch consignment data,
and index it in OpenSearch.

Args:
consignment_reference (str): The reference identifier for the consignment.
bucket_name (str): The name of the S3 bucket containing file records.
secret_id (str): The ID of the AWS secret storing database and OpenSearch credentials.

Returns:
None
"""
secret_string = get_secret_data(secret_id)
database_url = _build_db_url(secret_string)
open_search_host_url = secret_string["OPEN_SEARCH_HOST"]
open_search_http_auth = _get_opensearch_auth(secret_string)

bulk_index_consignment(
consignment_reference,
bucket_name,
database_url,
open_search_host_url,
open_search_http_auth,
)


def bulk_index_consignment(
consignment_reference: str,
bucket_name: str,
database_url: str,
open_search_host_url: str,
open_search_http_auth: Union[AWS4Auth, Tuple[str, str]],
open_search_ca_certs: Optional[str] = None,
) -> None:
"""
Fetch files associated with a consignment and index them in OpenSearch.

Args:
consignment_reference (str): The consignment reference identifier.
bucket_name (str): The S3 bucket name containing files.
database_url (str): The connection string for the PostgreSQL database.
open_search_host_url (str): The host URL of the OpenSearch cluster.
open_search_http_auth (Union[AWS4Auth, Tuple[str, str]]): The authentication credentials for OpenSearch.
open_search_ca_certs (Optional[str]): Path to CA certificates for SSL verification.

Returns:
None
"""
files = _fetch_files_in_consignment(consignment_reference, database_url)
documents_to_index = _construct_documents(files, bucket_name)
bulk_index_files_in_opensearch(
documents_to_index,
open_search_host_url,
open_search_http_auth,
open_search_ca_certs,
)


def _construct_documents(files: List[Dict], bucket_name: str) -> List[Dict]:
"""
Construct a list of documents to be indexed in OpenSearch from file metadata.

Args:
files (List[Dict]): The list of file metadata dictionaries.
bucket_name (str): The S3 bucket name where the files are stored.

Returns:
List[Dict]: A list of documents ready for indexing.
"""
documents_to_index = []
for file in files:
object_key = file["consignment_reference"] + "/" + str(file["file_id"])

logger.info(f"Processing file: {object_key}")

file_stream = None
document = file

try:
file_stream = get_s3_file(bucket_name, object_key)
except Exception as e:
logger.error(f"Failed to obtain file {object_key}: {e}")

document = add_text_content(file, file_stream)

documents_to_index.append(
{"file_id": file["file_id"], "document": document}
)
return documents_to_index


def _fetch_files_in_consignment(
consignment_reference: str, database_url: str
) -> List[Dict]:
"""
Fetch file metadata associated with the given consignment reference.

Args:
consignment_reference (str): The consignment reference identifier.
database_url (str): The connection string for the PostgreSQL database.

Returns:
List[Dict]: A list of file metadata dictionaries.
"""
engine = create_engine(database_url)
Base = declarative_base()
Base.metadata.reflect(bind=engine)
Session = sessionmaker(bind=engine)
session = Session()

query = """
SELECT
f."FileId" AS file_id,
f."FileName" AS file_name,
f."FileReference" AS file_reference,
f."FilePath" AS file_path,
f."CiteableReference" AS citeable_reference,
s."SeriesId" AS series_id,
s."Name" AS series_name,
b."Name" AS transferring_body,
b."BodyId" AS transferring_body_id,
b."Description" AS transferring_body_description,
c."ConsignmentId" AS consignment_id,
c."ConsignmentReference" AS consignment_reference,
fm."PropertyName",
fm."Value"
FROM
"File" f
JOIN
"Consignment" c ON f."ConsignmentId" = c."ConsignmentId"
JOIN
"Series" s ON c."SeriesId" = s."SeriesId"
JOIN
"Body" b ON s."BodyId" = b."BodyId"
LEFT JOIN
"FileMetadata" fm ON f."FileId" = fm."FileId"
WHERE
c."ConsignmentReference" = :consignment_reference
AND f."FileType" = 'File';
"""
try:
result = session.execute(
text(query), {"consignment_reference": consignment_reference}
).fetchall()
except pg8000.Error as e:
raise Exception(f"Database query failed: {e}")
finally:
session.close()

# Process query results
files_data = {}

for row in result:
file_id = str(row.file_id)
if file_id not in files_data:
files_data[file_id] = {
"file_id": str(row.file_id),
"file_name": str(row.file_name),
"file_reference": str(row.file_reference),
"file_path": str(row.file_path),
"citeable_reference": str(row.citeable_reference),
"series_id": str(row.series_id),
"series_name": str(row.series_name),
"transferring_body": str(row.transferring_body),
"transferring_body_id": str(row.transferring_body_id),
"transferring_body_description": str(
row.transferring_body_description
),
"consignment_id": str(row.consignment_id),
"consignment_reference": str(row.consignment_reference),
}

if row.PropertyName:
files_data[file_id][row.PropertyName] = str(row.Value)

return list(files_data.values())


def bulk_index_files_in_opensearch(
documents: List[Dict[str, Union[str, Dict]]],
open_search_host_url: str,
open_search_http_auth: Union[AWS4Auth, Tuple[str, str]],
open_search_ca_certs: Optional[str] = None,
) -> None:
"""
Perform bulk indexing of documents in OpenSearch.

Args:
documents (List[Dict[str, Union[str, Dict]]]): The documents to index.
open_search_host_url (str): The OpenSearch cluster URL.
open_search_http_auth (Union[AWS4Auth, Tuple[str, str]]): The authentication credentials.
open_search_ca_certs (Optional[str]): Path to CA certificates for SSL verification.

Returns:
None
"""
bulk_data = []
for doc in documents:
bulk_data.append(
json.dumps(
{"index": {"_index": "documents", "_id": doc["file_id"]}}
)
)
bulk_data.append(json.dumps(doc["document"]))

bulk_payload = "\n".join(bulk_data) + "\n"

open_search = OpenSearch(
open_search_host_url,
http_auth=open_search_http_auth,
use_ssl=True,
verify_certs=True,
ca_certs=open_search_ca_certs,
connection_class=RequestsHttpConnection,
)

opensearch_index = "documents"

try:
response = open_search.bulk(index=opensearch_index, body=bulk_payload)
logger.info("Opensearch bulk command executed")
logger.info(response)

if response["errors"]:
logger.error("Errors occurred during bulk indexing")
for item in response["items"]:
if "error" in item.get("index", {}):
logger.error(
f"Error for document ID {item['index']['_id']}: {item['index']['error']}"
)
else:
logger.info("Bulk indexing completed successfully")
except Exception as e:
logger.error(f"Bulk indexing failed: {e}")
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import logging
import os
from typing import Any, Dict

from .bulk_index_consignment import bulk_index_files_in_opensearch_from_aws

logger = logging.getLogger()
logger.setLevel(logging.INFO)


def lambda_handler(event: Dict[str, Any], context: Any) -> None:
"""
AWS Lambda handler function to trigger the indexing of consignment files into OpenSearch.

This function is invoked by an AWS event containing details of a consignment. It retrieves
the necessary parameters from the event and environment variables, then calls the indexing
function to process and index the files into OpenSearch.

Args:
event (Dict[str, Any]): The event data triggering the Lambda function. Expected to contain:
- `detail` (Dict[str, Any]): A dictionary with `parameters` that includes:
- `reference` (str): The consignment reference identifier.
context (Any): AWS Lambda context object (not used in this function).

Environment Variables:
BUCKET_NAME (str): The name of the S3 bucket where the files are stored.
SECRET_ID (str): The identifier for the AWS Secrets Manager secret containing database
and OpenSearch credentials.

Raises:
Exception: If `consignment_reference`, `BUCKET_NAME`, or `SECRET_ID` are missing.
"""
logger.info("Lambda started")
logger.info("Event received: %s", event)

# Extract parameters from the event and environment variables
detail = event.get("detail", {})
consignment_reference = detail.get("parameters", {}).get("reference")
bucket_name = os.getenv("BUCKET_NAME")
secret_id = os.getenv("SECRET_ID")

# Validate required parameters
if not consignment_reference or not bucket_name or not secret_id:
error_message = "Missing consignment_reference, BUCKET_NAME, or SECRET_ID required for indexing"
logger.error(error_message)
raise Exception(error_message)

# Log and process the consignment reference
logger.info(f"Processing consignment reference: {consignment_reference}")
bulk_index_files_in_opensearch_from_aws(
consignment_reference, bucket_name, secret_id
)
logger.info("Lambda completed")
Loading
Loading