Skip to content

Commit

Permalink
Create consignment indexing lambda for opensearch
Browse files Browse the repository at this point in the history
  • Loading branch information
anthonyhashemi committed Dec 3, 2024
1 parent e0a5f5e commit c4adac7
Show file tree
Hide file tree
Showing 8 changed files with 417 additions and 37 deletions.
4 changes: 2 additions & 2 deletions data_management/opensearch_indexer/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ COPY requirements.txt ${LAMBDA_TASK_ROOT}

RUN pip install -r requirements.txt

COPY opensearch_indexer/ ${LAMBDA_TASK_ROOT}/opensearch_indexer
COPY index_consignment/ ${LAMBDA_TASK_ROOT}/index_consignment

CMD [ "opensearch_indexer.lambda_function.lambda_handler" ]
CMD [ "index_consignment.lambda_function.lambda_handler" ]
49 changes: 49 additions & 0 deletions data_management/opensearch_indexer/aws_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import json
import logging
from typing import Any, Dict
from urllib.parse import quote_plus

import boto3
from requests_aws4auth import AWS4Auth

logger = logging.getLogger()
logger.setLevel(logging.INFO)


def get_s3_file(bucket_name: str, object_key: str) -> bytes:
s3 = boto3.client("s3")
s3_file_object = s3.get_object(Bucket=bucket_name, Key=object_key)
return s3_file_object["Body"].read()


def get_secret_data(secret_id: str) -> Dict[str, Any]:
sm = boto3.client("secretsmanager")
secret_response = sm.get_secret_value(SecretId=secret_id)
return json.loads(secret_response["SecretString"])


def _build_db_url(secret_string: Dict[str, Any]) -> str:
return (
"postgresql+pg8000://"
f'{secret_string["DB_USER"]}:{quote_plus(secret_string["DB_PASSWORD"])}'
f'@{secret_string["DB_HOST"]}:{secret_string["DB_PORT"]}/{secret_string["DB_NAME"]}'
)


def _get_opensearch_auth(secret_string: Dict[str, Any]) -> AWS4Auth:
sts_client = boto3.client("sts")
assumed_role = sts_client.assume_role(
RoleArn=secret_string["OPEN_SEARCH_MASTER_ROLE_ARN"],
RoleSessionName="LambdaOpenSearchSession",
)
logger.info("Extract temporary credentials to access OpenSearch")
credentials = assumed_role["Credentials"]
open_search_http_auth = AWS4Auth(
credentials["AccessKeyId"],
credentials["SecretAccessKey"],
secret_string["AWS_REGION"],
"es",
session_token=credentials["SessionToken"],
)

return open_search_http_auth
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
import json
import logging
from typing import Dict, List, Optional, Tuple, Union

import psycopg2
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
from sqlalchemy import create_engine, text
from sqlalchemy.orm import declarative_base, sessionmaker

from data_management.opensearch_indexer.aws_helpers import (
_build_db_url,
_get_opensearch_auth,
get_s3_file,
get_secret_data,
)
from data_management.opensearch_indexer.text_extraction import add_text_content

logger = logging.getLogger()
logger.setLevel(logging.INFO)


def bulk_index_consignment_from_aws(
consignment_reference: str, bucket_name: str, secret_id: str
) -> None:
"""
Retrieve credentials and host information from AWS Secrets Manager, fetch consignment data,
and index it in OpenSearch.
Args:
consignment_reference (str): The reference identifier for the consignment.
bucket_name (str): The name of the S3 bucket containing file records.
secret_id (str): The ID of the AWS secret storing database and OpenSearch credentials.
Returns:
None
"""
secret_string = get_secret_data(secret_id)
database_url = _build_db_url(secret_string)
open_search_host_url = secret_string["OPEN_SEARCH_HOST"]
open_search_http_auth = _get_opensearch_auth(secret_string)

bulk_index_consignment(
consignment_reference,
bucket_name,
database_url,
open_search_host_url,
open_search_http_auth,
)


def bulk_index_consignment(
consignment_reference: str,
bucket_name: str,
database_url: str,
open_search_host_url: str,
open_search_http_auth: Union[AWS4Auth, Tuple[str, str]],
open_search_ca_certs: Optional[str] = None,
) -> None:
"""
Fetch files associated with a consignment and index them in OpenSearch.
Args:
consignment_reference (str): The consignment reference identifier.
bucket_name (str): The S3 bucket name containing files.
database_url (str): The connection string for the PostgreSQL database.
open_search_host_url (str): The host URL of the OpenSearch cluster.
open_search_http_auth (Union[AWS4Auth, Tuple[str, str]]): The authentication credentials for OpenSearch.
open_search_ca_certs (Optional[str]): Path to CA certificates for SSL verification.
Returns:
None
"""
files = _fetch_files_in_consignment(consignment_reference, database_url)
documents_to_index = _construct_documents(files, bucket_name)
bulk_index_files_in_opensearch(
documents_to_index,
open_search_host_url,
open_search_http_auth,
open_search_ca_certs,
)


def _construct_documents(files: List[Dict], bucket_name: str) -> List[Dict]:
"""
Construct a list of documents to be indexed in OpenSearch from file metadata.
Args:
files (List[Dict]): The list of file metadata dictionaries.
bucket_name (str): The S3 bucket name where the files are stored.
Returns:
List[Dict]: A list of documents ready for indexing.
"""
documents_to_index = []
for file in files:
object_key = file["consignment_reference"] + "/" + str(file["file_id"])

logger.info(f"Processing file: {object_key}")

file_stream = None
document = file

try:
file_stream = get_s3_file(bucket_name, object_key)
except Exception as e:
logger.error(f"Failed to obtain file {object_key}: {e}")
breakpoint()

document = add_text_content(file, file_stream)

documents_to_index.append(
{"file_id": file["file_id"], "document": document}
)
return documents_to_index


def _fetch_files_in_consignment(
consignment_reference: str, database_url: str
) -> List[Dict]:
"""
Fetch file metadata associated with the given consignment reference.
Args:
consignment_reference (str): The consignment reference identifier.
database_url (str): The connection string for the PostgreSQL database.
Returns:
List[Dict]: A list of file metadata dictionaries.
"""
engine = create_engine(database_url)
Base = declarative_base()
Base.metadata.reflect(bind=engine)
Session = sessionmaker(bind=engine)
session = Session()

query = """
SELECT
f."FileId" AS file_id,
f."FileName" AS file_name,
f."FileReference" AS file_reference,
f."FilePath" AS file_path,
f."CiteableReference" AS citeable_reference,
s."SeriesId" AS series_id,
s."Name" AS series_name,
b."Name" AS transferring_body,
b."BodyId" AS transferring_body_id,
b."Description" AS transferring_body_description,
c."ConsignmentId" AS consignment_id,
c."ConsignmentReference" AS consignment_reference,
fm."PropertyName",
fm."Value"
FROM
"File" f
JOIN
"Consignment" c ON f."ConsignmentId" = c."ConsignmentId"
JOIN
"Series" s ON c."SeriesId" = s."SeriesId"
JOIN
"Body" b ON s."BodyId" = b."BodyId"
LEFT JOIN
"FileMetadata" fm ON f."FileId" = fm."FileId"
WHERE
c."ConsignmentReference" = :consignment_reference
AND f."FileType" = 'File';
"""
try:
result = session.execute(
text(query), {"consignment_reference": consignment_reference}
).fetchall()
except psycopg2.Error as e:
raise Exception(f"Database query failed: {e}")
finally:
session.close()

# Process query results
files_data = {}
for row in result:
file_id = str(row.file_id)
if file_id not in files_data:
files_data[file_id] = {
"file_id": str(row.file_id),
"file_name": str(row.file_name),
"file_reference": str(row.file_reference),
"file_path": str(row.file_path),
"citeable_reference": str(row.citeable_reference),
"series_id": str(row.series_id),
"series_name": str(row.series_name),
"transferring_body": str(row.transferring_body),
"transferring_body_id": str(row.transferring_body_id),
"transferring_body_description": str(
row.transferring_body_description
),
"consignment_id": str(row.consignment_id),
"consignment_reference": str(row.consignment_reference),
}

if row.PropertyName:
files_data[file_id][row.PropertyName] = str(row.Value)

return list(files_data.values())


def bulk_index_files_in_opensearch(
documents: List[Dict[str, Union[str, Dict]]],
open_search_host_url: str,
open_search_http_auth: Union[AWS4Auth, Tuple[str, str]],
open_search_ca_certs: Optional[str] = None,
) -> None:
"""
Perform bulk indexing of documents in OpenSearch.
Args:
documents (List[Dict[str, Union[str, Dict]]]): The documents to index.
open_search_host_url (str): The OpenSearch cluster URL.
open_search_http_auth (Union[AWS4Auth, Tuple[str, str]]): The authentication credentials.
open_search_ca_certs (Optional[str]): Path to CA certificates for SSL verification.
Returns:
None
"""
bulk_data = []
for doc in documents:
# bulk_data.append(
# {"index": {"_index": "documents", "_id": doc["file_id"], "_source": doc["document"]}}
# )
bulk_data.append(
json.dumps(
{"index": {"_index": "documents", "_id": doc["file_id"]}}
)
)
bulk_data.append(json.dumps(doc["document"]))

bulk_payload = "\n".join(bulk_data) + "\n"

open_search = OpenSearch(
open_search_host_url,
http_auth=open_search_http_auth,
use_ssl=True,
verify_certs=True,
ca_certs=open_search_ca_certs,
connection_class=RequestsHttpConnection,
)

opensearch_index = "documents"
breakpoint()
try:
response = open_search.bulk(index=opensearch_index, body=bulk_payload)
logger.info("Opensearch bulk command executed")
logger.info(response)
breakpoint()
if response["errors"]:
logger.error("Errors occurred during bulk indexing")
for item in response["items"]:
if "error" in item.get("index", {}):
logger.error(
f"Error for document ID {item['index']['_id']}: {item['index']['error']}"
)
else:
logger.info("Bulk indexing completed successfully")
except Exception as e:
logger.error(f"Bulk indexing failed: {e}")
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import logging
import os
from typing import Any, Dict

from .bulk_index_consignment import bulk_index_files_in_opensearch_from_aws

logger = logging.getLogger()
logger.setLevel(logging.INFO)


def lambda_handler(event: Dict[str, Any], context: Any) -> None:
"""
AWS Lambda handler function to trigger the indexing of consignment files into OpenSearch.
This function is invoked by an AWS event containing details of a consignment. It retrieves
the necessary parameters from the event and environment variables, then calls the indexing
function to process and index the files into OpenSearch.
Args:
event (Dict[str, Any]): The event data triggering the Lambda function. Expected to contain:
- `detail` (Dict[str, Any]): A dictionary with `parameters` that includes:
- `reference` (str): The consignment reference identifier.
context (Any): AWS Lambda context object (not used in this function).
Environment Variables:
BUCKET_NAME (str): The name of the S3 bucket where the files are stored.
SECRET_ID (str): The identifier for the AWS Secrets Manager secret containing database
and OpenSearch credentials.
Raises:
Exception: If `consignment_reference`, `BUCKET_NAME`, or `SECRET_ID` are missing.
"""
logger.info("Lambda started")
logger.info("Event received: %s", event)

# Extract parameters from the event and environment variables
detail = event.get("detail", {})
consignment_reference = detail.get("parameters", {}).get("reference")
bucket_name = os.getenv("BUCKET_NAME")
secret_id = os.getenv("SECRET_ID")

# Validate required parameters
if not consignment_reference or not bucket_name or not secret_id:
error_message = "Missing consignment_reference, BUCKET_NAME, or SECRET_ID required for indexing"
logger.error(error_message)
raise Exception(error_message)

# Log and process the consignment reference
logger.info(f"Processing consignment reference: {consignment_reference}")
bulk_index_files_in_opensearch_from_aws(
consignment_reference, bucket_name, secret_id
)
logger.info("Lambda completed")
Loading

0 comments on commit c4adac7

Please sign in to comment.