Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DE-101: Re-platform application to Python #11

Merged
merged 20 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions .github/workflows/run-unit-tests.yml
Original file line number Diff line number Diff line change
@@ -1,23 +1,30 @@
name: Run Python unit tests

on: [ push ]
on: pull_request: types: [ labeled, unlabeled, opened, reopened, synchronize ]

jobs:
changelog:
name: Updates changelog
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: dangoslen/changelog-enforcer@v3
test:
runs-on: ubuntu-latest
steps:
- name: Checkout repo
uses: actions/checkout@v3

- name: Set up Python 3.9
- name: Set up Python 3.12
uses: actions/setup-python@v4
with:
python-version: '3.9'
python-version: '3.12'
cache: 'pip'

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r devel_requirements.txt
pip install -r requirements.txt

- name: Run linter and test suite
Expand Down
2 changes: 1 addition & 1 deletion .python-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.9.16
3.12.0
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ sam local invoke --profile nypl-digital-dev -t config/sam.qa.yml -e sample/fireh

The [sample](./sample) folder contains sample Firehose events and their expected outcomes after Lambda event handling, so you can test the efficacy of your code with various schemas.

With Python, you also have the option of using the [python-lambda-local](https://pypi.org/project/python-lambda-local/) package for local development!
With Python, you also have the option of using the [python-lambda-local](https://pypi.org/project/python-lambda-local/) package for local development! You will need to create a JSON file with env variables to use said package.

## Contributing / Deployment

Expand Down
1 change: 0 additions & 1 deletion config/devel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,4 @@
PLAINTEXT_VARIABLES:
ENVIRONMENT: devel
NYPL_DATA_API_BASE_URL: https://qa-platform.nypl.org/api/v0.1/
fatimarahman marked this conversation as resolved.
Show resolved Hide resolved
SCHEMA_PATH: current-schemas/
...
6 changes: 0 additions & 6 deletions config/production.yaml

This file was deleted.

6 changes: 0 additions & 6 deletions config/qa.yaml

This file was deleted.

4 changes: 1 addition & 3 deletions config/sam.qa.yml
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this file used by the python lambda package you mentioned? Also remove SCHEMA_NAME var below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be used for sam local or the python lambda local usage! This is mainly a holdover from the NodeJS implementation, since we have plenty of sample events (in the sample folder) to use and verify when running local changes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So does sam local or the python lambda package actually run the QA Lambda function or does it just mimic it? Like when you run it could I go on AWS and see the logs from the run?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sam local mimics the Lambda function locally and stores the logs locally. So this will not be visible on AWS. When you run the command, the output shows up on the terminal.

Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@ Resources:
Properties:
CodeUri: .
Handler: lambda_function.lambda_handler
Runtime: python3.9
Runtime: python3.12
Timeout: 10
Environment:
Variables:
NYPL_DATA_API_BASE_URL: https://qa-platform.nypl.org/api/v0.1/
SCHEMA_NAME: CircTrans
SCHEMA_PATH: current-schemas/
LOG_LEVEL: debug
8 changes: 8 additions & 0 deletions devel_requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
black
boto3
fatimarahman marked this conversation as resolved.
Show resolved Hide resolved
botocore
nypl-py-utils[avro-client]==1.2.0
pybase64
python-csv
python-io
pyyaml
33 changes: 16 additions & 17 deletions lambda_function.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,27 @@
import os
import re

from nypl_py_utils.functions.log_helper import create_log
from nypl_py_utils.functions.config_helper import load_env_file
fatimarahman marked this conversation as resolved.
Show resolved Hide resolved
from nypl_py_utils.functions.log_helper import create_log
from record_processor import RecordProcessor


def lambda_handler(event, context):
logger = create_log("lambda_function")
load_env_file(os.environ["ENVIRONMENT"], "config/{}.yaml")
if os.environ["ENVIRONMENT"] == "devel":
load_env_file(os.environ["ENVIRONMENT"], "config/{}.yaml")

logger.info("Starting event processing...")

if event is None:
logger.error("Event is undefined.")
# TODO: raise exception here?
raise RecordParsingError("No event found.")
else:
# All records under one event will have the same schema
schema_name = _pull_schema_name(event)
os.environ["SCHEMA_NAME"] = schema_name
schema_name = _pull_schema_name(
event["sourceKinesisStreamArn"])
schema_url = (
os.environ["NYPL_DATA_API_BASE_URL"]
+ os.environ["SCHEMA_PATH"]
+ f"{schema_name}"
os.environ["NYPL_DATA_API_BASE_URL"] + "current-schemas/" + f"{schema_name}"
fatimarahman marked this conversation as resolved.
Show resolved Hide resolved
)
output_format = "json" if schema_name != "LocationHours" else "csv"

Expand All @@ -38,13 +37,14 @@ def lambda_handler(event, context):
if "data" in record:
result = processor.process_record(record, output_format)
if "ProcessingFailed" in result["result"]:
logger.error(
f"Error processing record data: {result}")
failures += 1
fatimarahman marked this conversation as resolved.
Show resolved Hide resolved
else:
successes += 1
processed_records.append(result)
except Exception as e:
# Catch any errors in the case the event has no records, etc
logger.error(f"Error processing records: {repr(e)}")
raise RecordParsingError(e)

logger.info(
Expand All @@ -55,22 +55,21 @@ def lambda_handler(event, context):
return {"records": processed_records}


def _pull_schema_name(event):
"""Given a Firehouse event, pulls encoded schema type from stream ARN.
def _pull_schema_name(stream_arn):
"""
Given a Firehose event's stream ARN, pulls encoded schema type.
Example input -- "arn:aws:kinesis:us-east-1:946183545209:stream/PcReserve-production"
Example output -- "PcReserve"
"""
filtered_for_stream_name = event["sourceKinesisStreamArn"].split(":").pop()
# Against convention, the "CircTransAnon" stream contains "CircTrans"
# encoded records, so ensure the correct schema name is chosen
filtered_for_stream_name = stream_arn.split(":").pop()
replacements = [
("^stream/", ""),
("-[a-z]+$", ""),
("^CircTransAnon$", "CircTrans"),
("-[a-z]+$", "")
]

for old, new in replacements:
filtered_for_stream_name = re.sub(old, new, filtered_for_stream_name)
filtered_for_stream_name = re.sub(
old, new, filtered_for_stream_name)
return filtered_for_stream_name


Expand Down
8 changes: 4 additions & 4 deletions record_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import io
import json

from nypl_py_utils.functions.log_helper import create_log
from nypl_py_utils.classes.avro_client import AvroDecoder
fatimarahman marked this conversation as resolved.
Show resolved Hide resolved
from nypl_py_utils.functions.log_helper import create_log


class RecordProcessor:
Expand All @@ -19,7 +19,8 @@ def process_record(self, record, output_format):
desired output format for Firehose (JSON or CSV string)
"""
binary_record_data = base64.b64decode(record["data"])
decoded_record_data = self.avro_decoder.decode_record(binary_record_data)
decoded_record_data = self.avro_decoder.decode_record(
binary_record_data)

if decoded_record_data is None:
fatimarahman marked this conversation as resolved.
Show resolved Hide resolved
# Unable to decode Avro record
Expand All @@ -43,8 +44,7 @@ def _format_result_string(self, output_format, data):
data = self._transform_dictionary_to_csv_string(data)
else:
data = json.dumps(data)
# After decoding, convert to base64. More often than not,
# the original data is either encoded or not in base64
# encode output data to base64
to_bytes = data.encode("utf-8")
return (base64.b64encode(to_bytes)).decode("utf-8")

Expand Down
7 changes: 2 additions & 5 deletions requirements.txt
aaronfriedman6 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
black
boto3
botocore
nypl-py-utils[avro, requests]==1.2.0
nypl-py-utils[avro-client]==1.2.0
pybase64
python-csv
python-io
pyyaml
pyyaml
fatimarahman marked this conversation as resolved.
Show resolved Hide resolved
1 change: 0 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
TEST_ENV_VARS = {
"ENVIRONMENT": "test",
"NYPL_DATA_API_BASE_URL": "https://qa-platform.nypl.org/api/v0.1/",
fatimarahman marked this conversation as resolved.
Show resolved Hide resolved
"SCHEMA_PATH": "current-schemas/",
}


Expand Down
9 changes: 4 additions & 5 deletions tests/test_lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
},
]

circ_trans_processed_records =[
circ_trans_processed_records = [
{
"recordId": "789",
"result": "Ok",
Expand Down Expand Up @@ -61,21 +61,20 @@ def test_instance_1_failure_2_success(self, mocker, test_data):
)

def test_lambda_handler_no_event_error(self, test_instance_3_success, caplog):
with caplog.at_level(logging.ERROR):
with pytest.raises(lambda_function.RecordParsingError):
lambda_function.lambda_handler(None, None)
assert "Event is undefined." in caplog.text

def test_lambda_handler_no_event_records_exception(
def test_lambda_handler_no_event_records_return_empty_array(
self, test_instance_3_success, caplog):
event = {
"invocationId": "invocationIdExample",
"deliveryStreamArn": "deliveryExample",
"sourceKinesisStreamArn": "streamExample",
"region": "us-east-1",
}
with pytest.raises(lambda_function.RecordParsingError):
with pytest.raises(Exception):
(lambda_function.lambda_handler(event, None))
assert "Error processing records: KeyError('records')" in caplog.text

def test_lambda_handler_success(
self, test_instance_3_success, test_data, caplog):
Expand Down