Skip to content

Commit

Permalink
address Heitor's suggestion
Browse files Browse the repository at this point in the history
  • Loading branch information
roger-zhangg committed Sep 7, 2023
1 parent a6c05a5 commit a3ed9f9
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 49 deletions.
12 changes: 6 additions & 6 deletions aws_lambda_powertools/utilities/data_classes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
from .event_source import event_source
from .kafka_event import KafkaEvent
from .kinesis_firehose_event import (
KinesisFirehoseDataTransformationRecord,
KinesisFirehoseDataTransformationRecordMetadata,
KinesisFirehoseDataTransformationResponse,
KinesisFirehoseEvent,
KinesisFirehoseResponse,
KinesisFirehoseResponseRecord,
KinesisFirehoseResponseRecordMetadata,
)
from .kinesis_stream_event import KinesisStreamEvent
from .lambda_function_url_event import LambdaFunctionUrlEvent
Expand All @@ -42,9 +42,9 @@
"KafkaEvent",
"KinesisFirehoseEvent",
"KinesisStreamEvent",
"KinesisFirehoseResponse",
"KinesisFirehoseResponseRecord",
"KinesisFirehoseResponseRecordMetadata",
"KinesisFirehoseDataTransformationResponse",
"KinesisFirehoseDataTransformationRecord",
"KinesisFirehoseDataTransformationRecordMetadata",
"LambdaFunctionUrlEvent",
"S3Event",
"S3EventBridgeNotificationEvent",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import base64
import json
from dataclasses import dataclass
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, Iterator, List, Optional

from typing_extensions import Literal
Expand All @@ -9,7 +9,7 @@


@dataclass
class KinesisFirehoseResponseRecordMetadata:
class KinesisFirehoseDataTransformationRecordMetadata:
"""
Documentation:
--------------
Expand All @@ -18,14 +18,14 @@ class KinesisFirehoseResponseRecordMetadata:

partition_keys: Optional[Dict[str, str]]

def asdict(self) -> Optional[Dict]:
def asdict(self) -> Dict:
if self.partition_keys is not None:
return {"partitionKeys": self.partition_keys}
return None
return {}


@dataclass
class KinesisFirehoseResponseRecord:
class KinesisFirehoseDataTransformationRecord:
"""Record in Kinesis Data Firehose response object
Documentation:
Expand All @@ -36,13 +36,13 @@ class KinesisFirehoseResponseRecord:
# Record ID; uniquely identifies this record within the current batch"""
record_id: str
# Processing result, supported value: Ok, Dropped, ProcessingFailed"""
result: Literal["Ok", "Dropped", "ProcessingFailed"]
result: Literal["Ok", "Dropped", "ProcessingFailed"] = "Ok"
# data blob, base64-encoded, optional at init. Allows pass in base64-encoded data directly or
# use either function like `data_from_text`, `data_from_json` to populate data"""
data: Optional[str] = None
# Optional: Metadata associated with this record; can contain partition keys
# See - https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
metadata: Optional[KinesisFirehoseResponseRecordMetadata] = None
metadata: Optional[KinesisFirehoseDataTransformationRecordMetadata] = None
_json_data: Optional[Any] = None
json_serializer: Callable = json.dumps
json_deserializer: Callable = json.loads
Expand Down Expand Up @@ -70,31 +70,31 @@ def asdict(self) -> Dict:
return r

@property
def data_as_bytes(self) -> Optional[bytes]:
def data_as_bytes(self) -> bytes:
"""Decoded base64-encoded data as bytes"""
if not self.data:
return None
return b""
return base64.b64decode(self.data)

@property
def data_as_text(self) -> Optional[str]:
def data_as_text(self) -> str:
"""Decoded base64-encoded data as text"""
if not self.data_as_bytes:
return None
if not self.data:
return ""
return self.data_as_bytes.decode("utf-8")

@property
def data_as_json(self) -> Optional[Dict]:
def data_as_json(self) -> Dict:
"""Decoded base64-encoded data loaded to json"""
if not self.data_as_text:
return None
if not self.data:
return {}
if self._json_data is None:
self._json_data = self.json_deserializer(self.data_as_text)
return self._json_data


@dataclass
class KinesisFirehoseResponse:
class KinesisFirehoseDataTransformationResponse:
"""Kinesis Data Firehose response object
Documentation:
Expand All @@ -108,13 +108,10 @@ class KinesisFirehoseResponse:
optional parameter at start. can be added later using `add_record` function.
"""

records: Optional[List[KinesisFirehoseResponseRecord]] = None
records: List[KinesisFirehoseDataTransformationRecord] = field(default_factory=list)

def add_record(self, record: KinesisFirehoseResponseRecord):
if self.records:
self.records.append(record)
else:
self.records = [record]
def add_record(self, record: KinesisFirehoseDataTransformationRecord):
self.records.append(record)

def asdict(self) -> Dict:
if not self.records:
Expand Down Expand Up @@ -196,12 +193,12 @@ def data_as_json(self) -> dict:
self._json_data = self._json_deserializer(self.data_as_text)
return self._json_data

def create_firehose_response_record(
def build_data_transformation_response(
self,
result: Literal["Ok", "Dropped", "ProcessingFailed"],
result: Literal["Ok", "Dropped", "ProcessingFailed"] = "Ok",
data: Optional[str] = None,
metadata: Optional[KinesisFirehoseResponseRecordMetadata] = None,
) -> KinesisFirehoseResponseRecord:
metadata: Optional[KinesisFirehoseDataTransformationRecordMetadata] = None,
) -> KinesisFirehoseDataTransformationRecord:
"""create a KinesisFirehoseResponseRecord directly using the record_id and given values
Parameters
----------
Expand All @@ -214,7 +211,12 @@ def create_firehose_response_record(
Metadata associated with this record; can contain partition keys
- https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
"""
return KinesisFirehoseResponseRecord(record_id=self.record_id, result=result, data=data, metadata=metadata)
return KinesisFirehoseDataTransformationRecord(
record_id=self.record_id,
result=result,
data=data,
metadata=metadata,
)


class KinesisFirehoseEvent(DictWrapper):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
from aws_lambda_powertools.utilities.data_classes import (
KinesisFirehoseDataTransformationResponse,
KinesisFirehoseEvent,
KinesisFirehoseResponse,
event_source,
)
from aws_lambda_powertools.utilities.typing import LambdaContext


@event_source(data_class=KinesisFirehoseEvent)
def lambda_handler(event: KinesisFirehoseEvent, context: LambdaContext):
result = KinesisFirehoseResponse()
result = KinesisFirehoseDataTransformationResponse()

for record in event.records:
# if data was delivered as json; caches loaded value
Expand All @@ -18,7 +18,7 @@ def lambda_handler(event: KinesisFirehoseEvent, context: LambdaContext):
## generate data to return
new_data = {"tool_used": "powertools_dataclass", "original_payload": data}

processed_record = record.create_firehose_response_record(result="Ok")
processed_record = record.build_data_transformation_response(result="Ok")
processed_record.data_from_json(data=new_data)

result.add_record(processed_record)
Expand Down
8 changes: 4 additions & 4 deletions examples/event_sources/src/kinesis_firehose_response.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import base64

from aws_lambda_powertools.utilities.data_classes import (
KinesisFirehoseResponse,
KinesisFirehoseResponseRecord,
KinesisFirehoseDataTransformationRecord,
KinesisFirehoseDataTransformationResponse,
)


def lambda_handler(event, context):
result = KinesisFirehoseResponse()
result = KinesisFirehoseDataTransformationResponse()

for record in event["records"]:
print(record["recordId"])
Expand All @@ -16,7 +16,7 @@ def lambda_handler(event, context):
## generate data to return
new_data = {"tool_used": "powertools_dataclass", "original_payload": payload}

processed_record = KinesisFirehoseResponseRecord(record_id=record["recordId"], result="Ok")
processed_record = KinesisFirehoseDataTransformationRecord(record_id=record["recordId"], result="Ok")
processed_record.data_from_text(data=new_data)
result.add_record(processed_record)

Expand Down
18 changes: 9 additions & 9 deletions tests/unit/data_classes/test_kinesis_firehose_response.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from aws_lambda_powertools.utilities.data_classes import (
KinesisFirehoseDataTransformationRecord,
KinesisFirehoseDataTransformationRecordMetadata,
KinesisFirehoseDataTransformationResponse,
KinesisFirehoseEvent,
KinesisFirehoseResponse,
KinesisFirehoseResponseRecord,
KinesisFirehoseResponseRecordMetadata,
)
from tests.functional.utils import load_event

Expand All @@ -11,13 +11,13 @@ def test_kinesis_firehose_response():
raw_event = load_event("kinesisFirehoseKinesisEvent.json")
parsed_event = KinesisFirehoseEvent(data=raw_event)

response = KinesisFirehoseResponse()
response = KinesisFirehoseDataTransformationResponse()
for record in parsed_event.records:
# if data was delivered as json; caches loaded value
data = record.data_as_text

metadata_partition = KinesisFirehoseResponseRecordMetadata(partition_keys={"year": 2023})
processed_record = KinesisFirehoseResponseRecord(
metadata_partition = KinesisFirehoseDataTransformationRecordMetadata(partition_keys={"year": 2023})
processed_record = KinesisFirehoseDataTransformationRecord(
record_id=record.record_id,
result="Ok",
metadata=metadata_partition,
Expand All @@ -43,12 +43,12 @@ def test_kinesis_firehose_create_response():
raw_event = load_event("kinesisFirehoseKinesisEvent.json")
parsed_event = KinesisFirehoseEvent(data=raw_event)

response = KinesisFirehoseResponse()
response = KinesisFirehoseDataTransformationResponse()
for record in parsed_event.records:
# if data was delivered as json; caches loaded value
data = record.data_as_text
metadata_partition = KinesisFirehoseResponseRecordMetadata(partition_keys={"year": 2023})
processed_record = record.create_firehose_response_record(
metadata_partition = KinesisFirehoseDataTransformationRecordMetadata(partition_keys={"year": 2023})
processed_record = record.build_data_transformation_response(
result="Ok",
metadata=metadata_partition,
)
Expand Down

0 comments on commit a3ed9f9

Please sign in to comment.