From 040358d010e6a78e0c11d41d96552d468d325774 Mon Sep 17 00:00:00 2001 From: RogerZhang Date: Wed, 30 Aug 2023 00:33:04 +0000 Subject: [PATCH 01/32] support kinesis response --- .../utilities/data_classes/__init__.py | 20 +++ .../data_classes/kinesis_firehose_response.py | 125 ++++++++++++++++++ .../src/kinesis_firehose_delivery_stream.py | 20 +-- .../test_kinesis_firehose_response.py | 36 +++++ 4 files changed, 192 insertions(+), 9 deletions(-) create mode 100644 aws_lambda_powertools/utilities/data_classes/kinesis_firehose_response.py create mode 100644 tests/unit/data_classes/test_kinesis_firehose_response.py diff --git a/aws_lambda_powertools/utilities/data_classes/__init__.py b/aws_lambda_powertools/utilities/data_classes/__init__.py index c619104fda..581ad12ee9 100644 --- a/aws_lambda_powertools/utilities/data_classes/__init__.py +++ b/aws_lambda_powertools/utilities/data_classes/__init__.py @@ -15,6 +15,17 @@ from .event_source import event_source from .kafka_event import KafkaEvent from .kinesis_firehose_event import KinesisFirehoseEvent +from .kinesis_firehose_response import ( + FirehoseStateDropped, + FirehoseStateFailed, + FirehoseStateOk, + KinesisFirehoseResponce, + KinesisFirehoseResponceFactory, + KinesisFirehoseResponceRecord, + KinesisFirehoseResponceRecordFactory, + KinesisFirehoseResponseRecordMetadata, + KinesisFirehoseResponseRecordMetadataFactory, +) from .kinesis_stream_event import KinesisStreamEvent from .lambda_function_url_event import LambdaFunctionUrlEvent from .s3_event import S3Event, S3EventBridgeNotificationEvent @@ -37,6 +48,15 @@ "KafkaEvent", "KinesisFirehoseEvent", "KinesisStreamEvent", + "KinesisFirehoseResponce", + "KinesisFirehoseResponceRecord", + "KinesisFirehoseResponseRecordMetadata", + "FirehoseStateOk", + "FirehoseStateDropped", + "FirehoseStateFailed", + "KinesisFirehoseResponceFactory", + "KinesisFirehoseResponceRecordFactory", + "KinesisFirehoseResponseRecordMetadataFactory", "LambdaFunctionUrlEvent", "S3Event", "S3EventBridgeNotificationEvent", diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_response.py b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_response.py new file mode 100644 index 0000000000..f6cf866cd7 --- /dev/null +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_response.py @@ -0,0 +1,125 @@ +from __future__ import annotations + +import base64 +from typing import Callable, Iterator, List, Optional, Union + +from aws_lambda_powertools.utilities.data_classes.common import DictWrapper + +FirehoseStateOk = "Ok" +FirehoseStateDropped = "Dropped" +FirehoseStateFailed = "ProcessingFailed" + + +class KinesisFirehoseResponseRecordMetadata(DictWrapper): + """ + Documentation: + -------------- + - https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html + """ + + @property + def _metadata(self) -> Optional[dict]: + """Optional: metadata associated with this record; present only when Kinesis Stream is source""" + return self["metadata"] # could raise KeyError + + @property + def partition_keys(self) -> Optional[dict[str, str]]: + """Kinesis stream partition key; present only when Kinesis Stream is source""" + return self._metadata["partitionKeys"] + + +def KinesisFirehoseResponseRecordMetadataFactory( + partition_keys: dict[str, str], + json_deserializer: Optional[Callable] = None, +) -> KinesisFirehoseResponseRecordMetadata: + data = { + "metadata": { + "partitionKeys": partition_keys, + }, + } + return KinesisFirehoseResponseRecordMetadata(data=data, json_deserializer=json_deserializer) + + +class KinesisFirehoseResponceRecord(DictWrapper): + """Record in Kinesis Data Firehose event + + Documentation: + -------------- + - https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html + """ + + @property + def record_id(self) -> str: + """Record ID; uniquely identifies this record within the current batch""" + return self["recordId"] + + @property + def result(self) -> Union[FirehoseStateOk, FirehoseStateDropped, FirehoseStateFailed]: + """processing result, supported value: Ok, Dropped, ProcessingFailed""" + return self["result"] + + @property + def data(self) -> str: + """The data blob, base64-encoded""" + return self["data"] + + @property + def metadata(self) -> Optional[KinesisFirehoseResponseRecordMetadata]: + """Optional: metadata associated with this record; present only when Kinesis Stream is source""" + return KinesisFirehoseResponseRecordMetadata(self._data) if self.get("metadata") else None + + @property + def data_as_bytes(self) -> bytes: + """Decoded base64-encoded data as bytes""" + return base64.b64decode(self.data) + + @property + def data_as_text(self) -> str: + """Decoded base64-encoded data as text""" + return self.data_as_bytes.decode("utf-8") + + @property + def data_as_json(self) -> dict: + """Decoded base64-encoded data loaded to json""" + if self._json_data is None: + self._json_data = self._json_deserializer(self.data_as_text) + return self._json_data + + +def KinesisFirehoseResponceRecordFactory( + record_id: str, + result: Union[FirehoseStateOk, FirehoseStateDropped, FirehoseStateFailed], + data: str, + metadata: Optional[KinesisFirehoseResponseRecordMetadata] = None, + json_deserializer: Optional[Callable] = None, +) -> KinesisFirehoseResponceRecord: + pass_data = { + "recordId": record_id, + "result": result, + "data": base64.b64encode(data.encode("utf-8")).decode("utf-8"), + } + if metadata: + data["metadata"] = metadata + return KinesisFirehoseResponceRecord(data=pass_data, json_deserializer=json_deserializer) + + +class KinesisFirehoseResponce(DictWrapper): + """Kinesis Data Firehose event + + Documentation: + -------------- + - https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html + """ + + @property + def records(self) -> Iterator[KinesisFirehoseResponceRecord]: + for record in self["records"]: + yield KinesisFirehoseResponceRecord(data=record, json_deserializer=self._json_deserializer) + + +def KinesisFirehoseResponceFactory( + records: List[KinesisFirehoseResponceRecord], + json_deserializer: Optional[Callable] = None, +) -> KinesisFirehoseResponce: + pass_data = {"records": records} + return KinesisFirehoseResponce(data=pass_data, json_deserializer=json_deserializer) diff --git a/examples/event_sources/src/kinesis_firehose_delivery_stream.py b/examples/event_sources/src/kinesis_firehose_delivery_stream.py index 770bfb1ee6..4b5d3a8721 100644 --- a/examples/event_sources/src/kinesis_firehose_delivery_stream.py +++ b/examples/event_sources/src/kinesis_firehose_delivery_stream.py @@ -1,8 +1,10 @@ -import base64 import json from aws_lambda_powertools.utilities.data_classes import ( + FirehoseStateOk, KinesisFirehoseEvent, + KinesisFirehoseResponce, + KinesisFirehoseResponceRecordFactory, event_source, ) from aws_lambda_powertools.utilities.typing import LambdaContext @@ -10,19 +12,19 @@ @event_source(data_class=KinesisFirehoseEvent) def lambda_handler(event: KinesisFirehoseEvent, context: LambdaContext): - result = [] + result = KinesisFirehoseResponce({}) for record in event.records: # if data was delivered as json; caches loaded value data = record.data_as_json - processed_record = { - "recordId": record.record_id, - "data": base64.b64encode(json.dumps(data).encode("utf-8")), - "result": "Ok", - } + processed_record = KinesisFirehoseResponceRecordFactory( + record_id=record.record_id, + result=FirehoseStateOk, + data=(json.dumps(data)), + ) - result.append(processed_record) + result.add_record(processed_record) # return transformed records - return {"records": result} + return result diff --git a/tests/unit/data_classes/test_kinesis_firehose_response.py b/tests/unit/data_classes/test_kinesis_firehose_response.py new file mode 100644 index 0000000000..3ed5f0d8db --- /dev/null +++ b/tests/unit/data_classes/test_kinesis_firehose_response.py @@ -0,0 +1,36 @@ +from aws_lambda_powertools.utilities.data_classes import ( + FirehoseStateOk, + KinesisFirehoseEvent, + KinesisFirehoseResponceFactory, + KinesisFirehoseResponceRecordFactory, +) +from tests.functional.utils import load_event + + +def test_kinesis_firehose_response(): + raw_event = load_event("kinesisFirehoseKinesisEvent.json") + parsed_event = KinesisFirehoseEvent(raw_event) + + result = [] + for record in parsed_event.records: + # if data was delivered as json; caches loaded value + data = record.data_as_text + + processed_record = KinesisFirehoseResponceRecordFactory( + record_id=record.record_id, + result=FirehoseStateOk, + data=(data), + ) + + result.append(processed_record) + response = KinesisFirehoseResponceFactory(result) + + res_records = list(response.records) + assert len(res_records) == 2 + record_01, record_02 = res_records[:] + record01_raw = raw_event["records"][0] + assert record_01.result == FirehoseStateOk + assert record_01.record_id == record01_raw["recordId"] + assert record_01.data_as_bytes == b"Hello World" + assert record_01.data_as_text == "Hello World" + assert record_01.data == record01_raw["data"] From 7ca016e3aa41fb71a4386d83e42a50ce7f055377 Mon Sep 17 00:00:00 2001 From: RogerZhang Date: Wed, 30 Aug 2023 20:42:47 +0000 Subject: [PATCH 02/32] fix lint, address Leandro suggestions --- .../utilities/data_classes/__init__.py | 3 --- .../data_classes/kinesis_firehose_response.py | 24 +++++++------------ .../src/kinesis_firehose_delivery_stream.py | 11 ++++----- .../test_kinesis_firehose_response.py | 5 ++-- 4 files changed, 16 insertions(+), 27 deletions(-) diff --git a/aws_lambda_powertools/utilities/data_classes/__init__.py b/aws_lambda_powertools/utilities/data_classes/__init__.py index 581ad12ee9..6908762530 100644 --- a/aws_lambda_powertools/utilities/data_classes/__init__.py +++ b/aws_lambda_powertools/utilities/data_classes/__init__.py @@ -16,9 +16,6 @@ from .kafka_event import KafkaEvent from .kinesis_firehose_event import KinesisFirehoseEvent from .kinesis_firehose_response import ( - FirehoseStateDropped, - FirehoseStateFailed, - FirehoseStateOk, KinesisFirehoseResponce, KinesisFirehoseResponceFactory, KinesisFirehoseResponceRecord, diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_response.py b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_response.py index f6cf866cd7..7fef723e05 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_response.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_response.py @@ -1,14 +1,8 @@ -from __future__ import annotations - import base64 -from typing import Callable, Iterator, List, Optional, Union +from typing import Any, Callable, Dict, Iterator, List, Literal, Optional from aws_lambda_powertools.utilities.data_classes.common import DictWrapper -FirehoseStateOk = "Ok" -FirehoseStateDropped = "Dropped" -FirehoseStateFailed = "ProcessingFailed" - class KinesisFirehoseResponseRecordMetadata(DictWrapper): """ @@ -18,18 +12,18 @@ class KinesisFirehoseResponseRecordMetadata(DictWrapper): """ @property - def _metadata(self) -> Optional[dict]: + def _metadata(self) -> Dict: """Optional: metadata associated with this record; present only when Kinesis Stream is source""" return self["metadata"] # could raise KeyError @property - def partition_keys(self) -> Optional[dict[str, str]]: + def partition_keys(self) -> Dict[str, str]: """Kinesis stream partition key; present only when Kinesis Stream is source""" return self._metadata["partitionKeys"] def KinesisFirehoseResponseRecordMetadataFactory( - partition_keys: dict[str, str], + partition_keys: Dict[str, str], json_deserializer: Optional[Callable] = None, ) -> KinesisFirehoseResponseRecordMetadata: data = { @@ -54,7 +48,7 @@ def record_id(self) -> str: return self["recordId"] @property - def result(self) -> Union[FirehoseStateOk, FirehoseStateDropped, FirehoseStateFailed]: + def result(self) -> Literal["Ok", "Dropped", "ProcessingFailed"]: """processing result, supported value: Ok, Dropped, ProcessingFailed""" return self["result"] @@ -79,7 +73,7 @@ def data_as_text(self) -> str: return self.data_as_bytes.decode("utf-8") @property - def data_as_json(self) -> dict: + def data_as_json(self) -> Dict: """Decoded base64-encoded data loaded to json""" if self._json_data is None: self._json_data = self._json_deserializer(self.data_as_text) @@ -88,18 +82,18 @@ def data_as_json(self) -> dict: def KinesisFirehoseResponceRecordFactory( record_id: str, - result: Union[FirehoseStateOk, FirehoseStateDropped, FirehoseStateFailed], + result: Literal["Ok", "Dropped", "ProcessingFailed"], data: str, metadata: Optional[KinesisFirehoseResponseRecordMetadata] = None, json_deserializer: Optional[Callable] = None, ) -> KinesisFirehoseResponceRecord: - pass_data = { + pass_data: Dict[str, Any] = { "recordId": record_id, "result": result, "data": base64.b64encode(data.encode("utf-8")).decode("utf-8"), } if metadata: - data["metadata"] = metadata + pass_data["metadata"] = metadata return KinesisFirehoseResponceRecord(data=pass_data, json_deserializer=json_deserializer) diff --git a/examples/event_sources/src/kinesis_firehose_delivery_stream.py b/examples/event_sources/src/kinesis_firehose_delivery_stream.py index 4b5d3a8721..34fc5d1e28 100644 --- a/examples/event_sources/src/kinesis_firehose_delivery_stream.py +++ b/examples/event_sources/src/kinesis_firehose_delivery_stream.py @@ -1,9 +1,8 @@ import json from aws_lambda_powertools.utilities.data_classes import ( - FirehoseStateOk, KinesisFirehoseEvent, - KinesisFirehoseResponce, + KinesisFirehoseResponceFactory, KinesisFirehoseResponceRecordFactory, event_source, ) @@ -12,7 +11,7 @@ @event_source(data_class=KinesisFirehoseEvent) def lambda_handler(event: KinesisFirehoseEvent, context: LambdaContext): - result = KinesisFirehoseResponce({}) + result = [] for record in event.records: # if data was delivered as json; caches loaded value @@ -20,11 +19,11 @@ def lambda_handler(event: KinesisFirehoseEvent, context: LambdaContext): processed_record = KinesisFirehoseResponceRecordFactory( record_id=record.record_id, - result=FirehoseStateOk, + result="Ok", data=(json.dumps(data)), ) - result.add_record(processed_record) + result.append(processed_record) # return transformed records - return result + return KinesisFirehoseResponceFactory(records=result) diff --git a/tests/unit/data_classes/test_kinesis_firehose_response.py b/tests/unit/data_classes/test_kinesis_firehose_response.py index 3ed5f0d8db..3e87e2e77c 100644 --- a/tests/unit/data_classes/test_kinesis_firehose_response.py +++ b/tests/unit/data_classes/test_kinesis_firehose_response.py @@ -1,5 +1,4 @@ from aws_lambda_powertools.utilities.data_classes import ( - FirehoseStateOk, KinesisFirehoseEvent, KinesisFirehoseResponceFactory, KinesisFirehoseResponceRecordFactory, @@ -18,7 +17,7 @@ def test_kinesis_firehose_response(): processed_record = KinesisFirehoseResponceRecordFactory( record_id=record.record_id, - result=FirehoseStateOk, + result="Ok", data=(data), ) @@ -29,7 +28,7 @@ def test_kinesis_firehose_response(): assert len(res_records) == 2 record_01, record_02 = res_records[:] record01_raw = raw_event["records"][0] - assert record_01.result == FirehoseStateOk + assert record_01.result == "Ok" assert record_01.record_id == record01_raw["recordId"] assert record_01.data_as_bytes == b"Hello World" assert record_01.data_as_text == "Hello World" From 8c9db3229c188a6c9518d953ccb03a4a73f3c98c Mon Sep 17 00:00:00 2001 From: RogerZhang Date: Wed, 30 Aug 2023 20:47:19 +0000 Subject: [PATCH 03/32] remove deleted const --- aws_lambda_powertools/utilities/data_classes/__init__.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/aws_lambda_powertools/utilities/data_classes/__init__.py b/aws_lambda_powertools/utilities/data_classes/__init__.py index 6908762530..da573866c7 100644 --- a/aws_lambda_powertools/utilities/data_classes/__init__.py +++ b/aws_lambda_powertools/utilities/data_classes/__init__.py @@ -48,9 +48,6 @@ "KinesisFirehoseResponce", "KinesisFirehoseResponceRecord", "KinesisFirehoseResponseRecordMetadata", - "FirehoseStateOk", - "FirehoseStateDropped", - "FirehoseStateFailed", "KinesisFirehoseResponceFactory", "KinesisFirehoseResponceRecordFactory", "KinesisFirehoseResponseRecordMetadataFactory", From f02319f1f37bd8711b8f7305dcbc298381ce8b16 Mon Sep 17 00:00:00 2001 From: RogerZhang Date: Wed, 30 Aug 2023 21:11:49 +0000 Subject: [PATCH 04/32] fix Literal import in 3.7 --- .../utilities/data_classes/kinesis_firehose_response.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_response.py b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_response.py index 7fef723e05..7f9c014d24 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_response.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_response.py @@ -1,5 +1,11 @@ import base64 -from typing import Any, Callable, Dict, Iterator, List, Literal, Optional +import sys +from typing import Any, Callable, Dict, Iterator, List, Optional + +if sys.version_info >= (3, 8): + from typing import Literal +else: + from typing_extensions import Literal from aws_lambda_powertools.utilities.data_classes.common import DictWrapper From 5f55aa75e9c5fe17af01fba71b0b1695bf08e7f1 Mon Sep 17 00:00:00 2001 From: RogerZhang Date: Fri, 1 Sep 2023 01:21:08 +0000 Subject: [PATCH 05/32] change to use data-classes --- .../utilities/data_classes/__init__.py | 18 +-- .../utilities/data_classes/common.py | 2 +- .../data_classes/kinesis_firehose_event.py | 122 ++++++++++++++++- .../data_classes/kinesis_firehose_response.py | 125 ------------------ .../src/kinesis_firehose_delivery_stream.py | 20 +-- .../test_kinesis_firehose_response.py | 56 ++++++-- 6 files changed, 180 insertions(+), 163 deletions(-) delete mode 100644 aws_lambda_powertools/utilities/data_classes/kinesis_firehose_response.py diff --git a/aws_lambda_powertools/utilities/data_classes/__init__.py b/aws_lambda_powertools/utilities/data_classes/__init__.py index da573866c7..549d7a4a15 100644 --- a/aws_lambda_powertools/utilities/data_classes/__init__.py +++ b/aws_lambda_powertools/utilities/data_classes/__init__.py @@ -14,14 +14,11 @@ from .event_bridge_event import EventBridgeEvent from .event_source import event_source from .kafka_event import KafkaEvent -from .kinesis_firehose_event import KinesisFirehoseEvent -from .kinesis_firehose_response import ( - KinesisFirehoseResponce, - KinesisFirehoseResponceFactory, - KinesisFirehoseResponceRecord, - KinesisFirehoseResponceRecordFactory, +from .kinesis_firehose_event import ( + KinesisFirehoseEvent, + KinesisFirehoseResponse, + KinesisFirehoseResponseRecord, KinesisFirehoseResponseRecordMetadata, - KinesisFirehoseResponseRecordMetadataFactory, ) from .kinesis_stream_event import KinesisStreamEvent from .lambda_function_url_event import LambdaFunctionUrlEvent @@ -45,12 +42,9 @@ "KafkaEvent", "KinesisFirehoseEvent", "KinesisStreamEvent", - "KinesisFirehoseResponce", - "KinesisFirehoseResponceRecord", + "KinesisFirehoseResponse", + "KinesisFirehoseResponseRecord", "KinesisFirehoseResponseRecordMetadata", - "KinesisFirehoseResponceFactory", - "KinesisFirehoseResponceRecordFactory", - "KinesisFirehoseResponseRecordMetadataFactory", "LambdaFunctionUrlEvent", "S3Event", "S3EventBridgeNotificationEvent", diff --git a/aws_lambda_powertools/utilities/data_classes/common.py b/aws_lambda_powertools/utilities/data_classes/common.py index 7a3fc8ab40..2126c27a68 100644 --- a/aws_lambda_powertools/utilities/data_classes/common.py +++ b/aws_lambda_powertools/utilities/data_classes/common.py @@ -15,7 +15,7 @@ class DictWrapper(Mapping): def __init__(self, data: Dict[str, Any], json_deserializer: Optional[Callable] = None): """ - Parameters + ParametersW ---------- data : Dict[str, Any] Lambda Event Source Event payload diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py index 47dc196856..821c1a09d8 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py @@ -1,8 +1,121 @@ import base64 -from typing import Iterator, Optional +import json +import sys +from dataclasses import dataclass +from typing import Any, Callable, Dict, Iterator, List, Optional from aws_lambda_powertools.utilities.data_classes.common import DictWrapper +if sys.version_info >= (3, 8): + from typing import Literal +else: + from typing_extensions import Literal + + +@dataclass +class KinesisFirehoseResponseRecordMetadata: + """ + Documentation: + -------------- + - https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html + """ + + partition_keys: Optional[Dict[str, str]] + + @property + def asdict(self) -> Optional[Dict]: + if self.partition_keys is not None: + return {"partitionKeys": self.partition_keys} + return None + + +@dataclass +class KinesisFirehoseResponseRecord: + """Record in Kinesis Data Firehose event + + Documentation: + -------------- + - https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html + """ + + """Record ID; uniquely identifies this record within the current batch""" + record_id: str + """processing result, supported value: Ok, Dropped, ProcessingFailed""" + result: Literal["Ok", "Dropped", "ProcessingFailed"] + """The data blob, base64-encoded, optional at init""" + data: Optional[str] = None + """Optional: metadata associated with this record; present only when Kinesis Stream is source""" + metadata: Optional[KinesisFirehoseResponseRecordMetadata] = None + """Json data for caching json.dump result""" + _json_data: Optional[Any] = None + json_serializer: Optional[Callable] = json.dumps + json_deserializer: Optional[Callable] = json.loads + + def data_from_byte(self, data: bytes): + """Populate data field using a byte like data""" + self.data = base64.b64encode(data).decode("utf-8") + + def data_from_text(self, data: str): + """Populate data field using a string like data""" + self.data_from_byte(data.encode("utf-8")) + + def data_from_json( + self, + data: Any, + ): + """Populate data field using any structure that could be converted to json""" + self.data_from_text(data=self.json_serializer(data)) + + @property + def asdict(self) -> Dict: + r = { + "recordId": self.record_id, + "result": self.result, + "data": self.data, + } + if self.metadata: + r["metadata"] = self.metadata.asdict + return r + + @property + def data_as_bytes(self) -> bytes: + """Decoded base64-encoded data as bytes""" + return base64.b64decode(self.data) + + @property + def data_as_text(self) -> str: + """Decoded base64-encoded data as text""" + return self.data_as_bytes.decode("utf-8") + + @property + def data_as_json(self) -> Dict: + """Decoded base64-encoded data loaded to json""" + if self._json_data is None: + self._json_data = self.json_deserializer(self.data_as_text) + return self._json_data + + +@dataclass +class KinesisFirehoseResponse: + """Kinesis Data Firehose event + + Documentation: + -------------- + - https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html + """ + + records: Optional[List[KinesisFirehoseResponseRecord]] = None + + def add_record(self, record: KinesisFirehoseResponseRecord): + if self.records: + self.records.append(record) + else: + self.records = [record] + + @property + def asdict(self) -> Dict: + return {"records": [r.asdict for r in self.records]} + class KinesisFirehoseRecordMetadata(DictWrapper): @property @@ -77,6 +190,13 @@ def data_as_json(self) -> dict: self._json_data = self._json_deserializer(self.data_as_text) return self._json_data + def create_firehose_response_record( + self, + result: Literal["Ok", "Dropped", "ProcessingFailed"], + data: str = None, + ) -> KinesisFirehoseResponseRecord: + return KinesisFirehoseResponseRecord(record_id=self.record_id, result=result, data=data) + class KinesisFirehoseEvent(DictWrapper): """Kinesis Data Firehose event diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_response.py b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_response.py deleted file mode 100644 index 7f9c014d24..0000000000 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_response.py +++ /dev/null @@ -1,125 +0,0 @@ -import base64 -import sys -from typing import Any, Callable, Dict, Iterator, List, Optional - -if sys.version_info >= (3, 8): - from typing import Literal -else: - from typing_extensions import Literal - -from aws_lambda_powertools.utilities.data_classes.common import DictWrapper - - -class KinesisFirehoseResponseRecordMetadata(DictWrapper): - """ - Documentation: - -------------- - - https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html - """ - - @property - def _metadata(self) -> Dict: - """Optional: metadata associated with this record; present only when Kinesis Stream is source""" - return self["metadata"] # could raise KeyError - - @property - def partition_keys(self) -> Dict[str, str]: - """Kinesis stream partition key; present only when Kinesis Stream is source""" - return self._metadata["partitionKeys"] - - -def KinesisFirehoseResponseRecordMetadataFactory( - partition_keys: Dict[str, str], - json_deserializer: Optional[Callable] = None, -) -> KinesisFirehoseResponseRecordMetadata: - data = { - "metadata": { - "partitionKeys": partition_keys, - }, - } - return KinesisFirehoseResponseRecordMetadata(data=data, json_deserializer=json_deserializer) - - -class KinesisFirehoseResponceRecord(DictWrapper): - """Record in Kinesis Data Firehose event - - Documentation: - -------------- - - https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html - """ - - @property - def record_id(self) -> str: - """Record ID; uniquely identifies this record within the current batch""" - return self["recordId"] - - @property - def result(self) -> Literal["Ok", "Dropped", "ProcessingFailed"]: - """processing result, supported value: Ok, Dropped, ProcessingFailed""" - return self["result"] - - @property - def data(self) -> str: - """The data blob, base64-encoded""" - return self["data"] - - @property - def metadata(self) -> Optional[KinesisFirehoseResponseRecordMetadata]: - """Optional: metadata associated with this record; present only when Kinesis Stream is source""" - return KinesisFirehoseResponseRecordMetadata(self._data) if self.get("metadata") else None - - @property - def data_as_bytes(self) -> bytes: - """Decoded base64-encoded data as bytes""" - return base64.b64decode(self.data) - - @property - def data_as_text(self) -> str: - """Decoded base64-encoded data as text""" - return self.data_as_bytes.decode("utf-8") - - @property - def data_as_json(self) -> Dict: - """Decoded base64-encoded data loaded to json""" - if self._json_data is None: - self._json_data = self._json_deserializer(self.data_as_text) - return self._json_data - - -def KinesisFirehoseResponceRecordFactory( - record_id: str, - result: Literal["Ok", "Dropped", "ProcessingFailed"], - data: str, - metadata: Optional[KinesisFirehoseResponseRecordMetadata] = None, - json_deserializer: Optional[Callable] = None, -) -> KinesisFirehoseResponceRecord: - pass_data: Dict[str, Any] = { - "recordId": record_id, - "result": result, - "data": base64.b64encode(data.encode("utf-8")).decode("utf-8"), - } - if metadata: - pass_data["metadata"] = metadata - return KinesisFirehoseResponceRecord(data=pass_data, json_deserializer=json_deserializer) - - -class KinesisFirehoseResponce(DictWrapper): - """Kinesis Data Firehose event - - Documentation: - -------------- - - https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html - """ - - @property - def records(self) -> Iterator[KinesisFirehoseResponceRecord]: - for record in self["records"]: - yield KinesisFirehoseResponceRecord(data=record, json_deserializer=self._json_deserializer) - - -def KinesisFirehoseResponceFactory( - records: List[KinesisFirehoseResponceRecord], - json_deserializer: Optional[Callable] = None, -) -> KinesisFirehoseResponce: - pass_data = {"records": records} - return KinesisFirehoseResponce(data=pass_data, json_deserializer=json_deserializer) diff --git a/examples/event_sources/src/kinesis_firehose_delivery_stream.py b/examples/event_sources/src/kinesis_firehose_delivery_stream.py index 34fc5d1e28..e6db5c772f 100644 --- a/examples/event_sources/src/kinesis_firehose_delivery_stream.py +++ b/examples/event_sources/src/kinesis_firehose_delivery_stream.py @@ -2,8 +2,7 @@ from aws_lambda_powertools.utilities.data_classes import ( KinesisFirehoseEvent, - KinesisFirehoseResponceFactory, - KinesisFirehoseResponceRecordFactory, + KinesisFirehoseResponse, event_source, ) from aws_lambda_powertools.utilities.typing import LambdaContext @@ -11,19 +10,20 @@ @event_source(data_class=KinesisFirehoseEvent) def lambda_handler(event: KinesisFirehoseEvent, context: LambdaContext): - result = [] + result = KinesisFirehoseResponse() for record in event.records: # if data was delivered as json; caches loaded value data = record.data_as_json - processed_record = KinesisFirehoseResponceRecordFactory( - record_id=record.record_id, - result="Ok", - data=(json.dumps(data)), - ) + json_data = json.loads(data) - result.append(processed_record) + ## do all kind of stuff with data + + processed_record = record.create_firehose_response_record(result="Ok") + processed_record.data_from_json(data=json_data) + + result.add_record(processed_record) # return transformed records - return KinesisFirehoseResponceFactory(records=result) + return result.asdict diff --git a/tests/unit/data_classes/test_kinesis_firehose_response.py b/tests/unit/data_classes/test_kinesis_firehose_response.py index 3e87e2e77c..9b7024bf06 100644 --- a/tests/unit/data_classes/test_kinesis_firehose_response.py +++ b/tests/unit/data_classes/test_kinesis_firehose_response.py @@ -1,35 +1,63 @@ from aws_lambda_powertools.utilities.data_classes import ( KinesisFirehoseEvent, - KinesisFirehoseResponceFactory, - KinesisFirehoseResponceRecordFactory, + KinesisFirehoseResponse, + KinesisFirehoseResponseRecord, ) from tests.functional.utils import load_event def test_kinesis_firehose_response(): raw_event = load_event("kinesisFirehoseKinesisEvent.json") - parsed_event = KinesisFirehoseEvent(raw_event) + parsed_event = KinesisFirehoseEvent(data=raw_event) - result = [] + response = KinesisFirehoseResponse() for record in parsed_event.records: # if data was delivered as json; caches loaded value data = record.data_as_text - processed_record = KinesisFirehoseResponceRecordFactory( + processed_record = KinesisFirehoseResponseRecord( record_id=record.record_id, result="Ok", - data=(data), ) + processed_record.data_from_text(data=data) + response.add_record(record=processed_record) + response_dict = response.asdict - result.append(processed_record) - response = KinesisFirehoseResponceFactory(result) + res_records = list(response_dict["records"]) + assert len(res_records) == 2 + record_01, record_02 = res_records[:] + record01_raw = raw_event["records"][0] + assert record_01["result"] == "Ok" + assert record_01["recordId"] == record01_raw["recordId"] + assert record_01["data"] == record01_raw["data"] + + assert response.records[0].data_as_bytes == b"Hello World" + assert response.records[0].data_as_text == "Hello World" - res_records = list(response.records) + +def test_kinesis_firehose_create_response(): + raw_event = load_event("kinesisFirehoseKinesisEvent.json") + parsed_event = KinesisFirehoseEvent(data=raw_event) + + response = KinesisFirehoseResponse() + for record in parsed_event.records: + # if data was delivered as json; caches loaded value + data = record.data_as_text + + processed_record = record.create_firehose_response_record( + result="Ok", + ) + processed_record.data_from_text(data=data) + response.add_record(record=processed_record) + response_dict = response.asdict + + res_records = list(response_dict["records"]) assert len(res_records) == 2 record_01, record_02 = res_records[:] record01_raw = raw_event["records"][0] - assert record_01.result == "Ok" - assert record_01.record_id == record01_raw["recordId"] - assert record_01.data_as_bytes == b"Hello World" - assert record_01.data_as_text == "Hello World" - assert record_01.data == record01_raw["data"] + assert record_01["result"] == "Ok" + assert record_01["recordId"] == record01_raw["recordId"] + assert record_01["data"] == record01_raw["data"] + + assert response.records[0].data_as_bytes == b"Hello World" + assert response.records[0].data_as_text == "Hello World" From 2566f62e1fd1873842c33267e724d0ff60834cfb Mon Sep 17 00:00:00 2001 From: RogerZhang Date: Fri, 1 Sep 2023 01:36:56 +0000 Subject: [PATCH 06/32] fix mypy --- .../data_classes/kinesis_firehose_event.py | 27 +++++++++++-------- .../src/kinesis_firehose_delivery_stream.py | 6 +---- 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py index 821c1a09d8..55d0fed483 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py @@ -48,8 +48,8 @@ class KinesisFirehoseResponseRecord: metadata: Optional[KinesisFirehoseResponseRecordMetadata] = None """Json data for caching json.dump result""" _json_data: Optional[Any] = None - json_serializer: Optional[Callable] = json.dumps - json_deserializer: Optional[Callable] = json.loads + json_serializer: Callable = json.dumps + json_deserializer: Callable = json.loads def data_from_byte(self, data: bytes): """Populate data field using a byte like data""" @@ -59,16 +59,13 @@ def data_from_text(self, data: str): """Populate data field using a string like data""" self.data_from_byte(data.encode("utf-8")) - def data_from_json( - self, - data: Any, - ): + def data_from_json(self, data: Any): """Populate data field using any structure that could be converted to json""" self.data_from_text(data=self.json_serializer(data)) @property def asdict(self) -> Dict: - r = { + r: Dict[str, Any] = { "recordId": self.record_id, "result": self.result, "data": self.data, @@ -78,18 +75,24 @@ def asdict(self) -> Dict: return r @property - def data_as_bytes(self) -> bytes: + def data_as_bytes(self) -> Optional[bytes]: """Decoded base64-encoded data as bytes""" + if not self.data: + return None return base64.b64decode(self.data) @property - def data_as_text(self) -> str: + def data_as_text(self) -> Optional[str]: """Decoded base64-encoded data as text""" + if not self.data_as_bytes: + return None return self.data_as_bytes.decode("utf-8") @property - def data_as_json(self) -> Dict: + def data_as_json(self) -> Optional[Dict]: """Decoded base64-encoded data loaded to json""" + if not self.data_as_text: + return None if self._json_data is None: self._json_data = self.json_deserializer(self.data_as_text) return self._json_data @@ -114,6 +117,8 @@ def add_record(self, record: KinesisFirehoseResponseRecord): @property def asdict(self) -> Dict: + if not self.records: + return {} return {"records": [r.asdict for r in self.records]} @@ -193,7 +198,7 @@ def data_as_json(self) -> dict: def create_firehose_response_record( self, result: Literal["Ok", "Dropped", "ProcessingFailed"], - data: str = None, + data: Optional[str] = None, ) -> KinesisFirehoseResponseRecord: return KinesisFirehoseResponseRecord(record_id=self.record_id, result=result, data=data) diff --git a/examples/event_sources/src/kinesis_firehose_delivery_stream.py b/examples/event_sources/src/kinesis_firehose_delivery_stream.py index e6db5c772f..1cb74c8dd7 100644 --- a/examples/event_sources/src/kinesis_firehose_delivery_stream.py +++ b/examples/event_sources/src/kinesis_firehose_delivery_stream.py @@ -1,5 +1,3 @@ -import json - from aws_lambda_powertools.utilities.data_classes import ( KinesisFirehoseEvent, KinesisFirehoseResponse, @@ -16,12 +14,10 @@ def lambda_handler(event: KinesisFirehoseEvent, context: LambdaContext): # if data was delivered as json; caches loaded value data = record.data_as_json - json_data = json.loads(data) - ## do all kind of stuff with data processed_record = record.create_firehose_response_record(result="Ok") - processed_record.data_from_json(data=json_data) + processed_record.data_from_json(data=data) result.add_record(processed_record) From 636e9d1de1e4ad5af2c67d042910c0f7d3336675 Mon Sep 17 00:00:00 2001 From: RogerZhang Date: Fri, 1 Sep 2023 19:44:31 +0000 Subject: [PATCH 07/32] fix typo, make asdict a function --- .../utilities/data_classes/common.py | 2 +- .../data_classes/kinesis_firehose_event.py | 30 +++++++++++-------- docs/utilities/data_classes.md | 11 +++++++ .../src/kinesis_firehose_delivery_stream.py | 6 ++-- .../src/kinesis_firehose_response.py | 24 +++++++++++++++ .../test_kinesis_firehose_response.py | 8 +++-- 6 files changed, 64 insertions(+), 17 deletions(-) create mode 100644 examples/event_sources/src/kinesis_firehose_response.py diff --git a/aws_lambda_powertools/utilities/data_classes/common.py b/aws_lambda_powertools/utilities/data_classes/common.py index 2126c27a68..7a3fc8ab40 100644 --- a/aws_lambda_powertools/utilities/data_classes/common.py +++ b/aws_lambda_powertools/utilities/data_classes/common.py @@ -15,7 +15,7 @@ class DictWrapper(Mapping): def __init__(self, data: Dict[str, Any], json_deserializer: Optional[Callable] = None): """ - ParametersW + Parameters ---------- data : Dict[str, Any] Lambda Event Source Event payload diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py index 55d0fed483..398f91e455 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py @@ -1,15 +1,11 @@ import base64 import json -import sys from dataclasses import dataclass from typing import Any, Callable, Dict, Iterator, List, Optional -from aws_lambda_powertools.utilities.data_classes.common import DictWrapper +from typing_extensions import Literal -if sys.version_info >= (3, 8): - from typing import Literal -else: - from typing_extensions import Literal +from aws_lambda_powertools.utilities.data_classes.common import DictWrapper @dataclass @@ -22,7 +18,6 @@ class KinesisFirehoseResponseRecordMetadata: partition_keys: Optional[Dict[str, str]] - @property def asdict(self) -> Optional[Dict]: if self.partition_keys is not None: return {"partitionKeys": self.partition_keys} @@ -42,7 +37,8 @@ class KinesisFirehoseResponseRecord: record_id: str """processing result, supported value: Ok, Dropped, ProcessingFailed""" result: Literal["Ok", "Dropped", "ProcessingFailed"] - """The data blob, base64-encoded, optional at init""" + """data blob, base64-encoded, optional at init. Allows pass in base64-encoded data directly or + use either function like `data_from_text`, `data_from_json` to populate data""" data: Optional[str] = None """Optional: metadata associated with this record; present only when Kinesis Stream is source""" metadata: Optional[KinesisFirehoseResponseRecordMetadata] = None @@ -63,7 +59,6 @@ def data_from_json(self, data: Any): """Populate data field using any structure that could be converted to json""" self.data_from_text(data=self.json_serializer(data)) - @property def asdict(self) -> Dict: r: Dict[str, Any] = { "recordId": self.record_id, @@ -71,7 +66,7 @@ def asdict(self) -> Dict: "data": self.data, } if self.metadata: - r["metadata"] = self.metadata.asdict + r["metadata"] = self.metadata.asdict() return r @property @@ -115,11 +110,12 @@ def add_record(self, record: KinesisFirehoseResponseRecord): else: self.records = [record] - @property def asdict(self) -> Dict: + # make sure return size is less than 6MB if not self.records: return {} - return {"records": [r.asdict for r in self.records]} + + return {"records": [r.asdict() for r in self.records]} class KinesisFirehoseRecordMetadata(DictWrapper): @@ -200,6 +196,16 @@ def create_firehose_response_record( result: Literal["Ok", "Dropped", "ProcessingFailed"], data: Optional[str] = None, ) -> KinesisFirehoseResponseRecord: + """create a KinesisFirehoseResponseRecord directly using the record_id and given values + Parameters + ---------- + result : Literal["Ok", "Dropped", "ProcessingFailed"] + processing result, supported value: Ok, Dropped, ProcessingFailed + data : str, optional + data blob, base64-encoded, optional at init. Allows pass in base64-encoded data directly or + use either function like `data_from_text`, `data_from_json` to populate data + + """ return KinesisFirehoseResponseRecord(record_id=self.record_id, result=result, data=data) diff --git a/docs/utilities/data_classes.md b/docs/utilities/data_classes.md index 7b3aa74e27..5cf98d1205 100644 --- a/docs/utilities/data_classes.md +++ b/docs/utilities/data_classes.md @@ -981,12 +981,23 @@ inline, and re-emit them back to the Delivery Stream. Similar to Kinesis Data Streams, the events contain base64 encoded data. You can use the helper function to access the data either as json or plain text, depending on the original payload. +When constructing response to Firehose, You can utilize the `KinesisFirehoseResponse` class shown +in the example below. + === "app.py" ```python --8<-- "examples/event_sources/src/kinesis_firehose_delivery_stream.py" ``` +You can also construct response without using `event_source` wrapper. Shown in the example below. + +=== "app.py" + + ```python + --8<-- "examples/event_sources/src/kinesis_firehose_response.py" + ``` + ### Lambda Function URL === "app.py" diff --git a/examples/event_sources/src/kinesis_firehose_delivery_stream.py b/examples/event_sources/src/kinesis_firehose_delivery_stream.py index 1cb74c8dd7..5b5805548d 100644 --- a/examples/event_sources/src/kinesis_firehose_delivery_stream.py +++ b/examples/event_sources/src/kinesis_firehose_delivery_stream.py @@ -15,11 +15,13 @@ def lambda_handler(event: KinesisFirehoseEvent, context: LambdaContext): data = record.data_as_json ## do all kind of stuff with data + ## generate data to return + new_data = {"tool_used": "powertools_dataclass", "original_payload": data} processed_record = record.create_firehose_response_record(result="Ok") - processed_record.data_from_json(data=data) + processed_record.data_from_json(data=new_data) result.add_record(processed_record) # return transformed records - return result.asdict + return result.asdict() diff --git a/examples/event_sources/src/kinesis_firehose_response.py b/examples/event_sources/src/kinesis_firehose_response.py new file mode 100644 index 0000000000..8817439aef --- /dev/null +++ b/examples/event_sources/src/kinesis_firehose_response.py @@ -0,0 +1,24 @@ +import base64 + +from aws_lambda_powertools.utilities.data_classes import ( + KinesisFirehoseResponse, + KinesisFirehoseResponseRecord, +) + + +def lambda_handler(event, context): + result = KinesisFirehoseResponse() + + for record in event["records"]: + print(record["recordId"]) + payload = base64.b64decode(record["data"]).decode("utf-8") + ## do all kind of stuff with payload + ## generate data to return + new_data = {"tool_used": "powertools_dataclass", "original_payload": payload} + + processed_record = KinesisFirehoseResponseRecord(record_id=record["recordId"], result="Ok") + processed_record.data_from_text(data=new_data) + result.add_record(processed_record) + + # return transformed records + return result.asdict() diff --git a/tests/unit/data_classes/test_kinesis_firehose_response.py b/tests/unit/data_classes/test_kinesis_firehose_response.py index 9b7024bf06..ea56a4dcf9 100644 --- a/tests/unit/data_classes/test_kinesis_firehose_response.py +++ b/tests/unit/data_classes/test_kinesis_firehose_response.py @@ -2,6 +2,7 @@ KinesisFirehoseEvent, KinesisFirehoseResponse, KinesisFirehoseResponseRecord, + KinesisFirehoseResponseRecordMetadata, ) from tests.functional.utils import load_event @@ -15,13 +16,15 @@ def test_kinesis_firehose_response(): # if data was delivered as json; caches loaded value data = record.data_as_text + metadata_partition = KinesisFirehoseResponseRecordMetadata(partition_keys={"year": 2023}) processed_record = KinesisFirehoseResponseRecord( record_id=record.record_id, result="Ok", + metadata=metadata_partition, ) processed_record.data_from_text(data=data) response.add_record(record=processed_record) - response_dict = response.asdict + response_dict = response.asdict() res_records = list(response_dict["records"]) assert len(res_records) == 2 @@ -30,6 +33,7 @@ def test_kinesis_firehose_response(): assert record_01["result"] == "Ok" assert record_01["recordId"] == record01_raw["recordId"] assert record_01["data"] == record01_raw["data"] + assert record_01["metadata"]["partitionKeys"]["year"] == 2023 assert response.records[0].data_as_bytes == b"Hello World" assert response.records[0].data_as_text == "Hello World" @@ -49,7 +53,7 @@ def test_kinesis_firehose_create_response(): ) processed_record.data_from_text(data=data) response.add_record(record=processed_record) - response_dict = response.asdict + response_dict = response.asdict() res_records = list(response_dict["records"]) assert len(res_records) == 2 From 370e1562ffa2d74b48a3b23aae52acc57eea7c45 Mon Sep 17 00:00:00 2001 From: RogerZhang Date: Tue, 5 Sep 2023 19:24:31 +0000 Subject: [PATCH 08/32] address Troy/Leandro suggestions --- .../data_classes/kinesis_firehose_event.py | 24 ++++++++++++++----- .../test_kinesis_firehose_response.py | 4 +++- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py index 398f91e455..81a234a13c 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py @@ -26,7 +26,7 @@ def asdict(self) -> Optional[Dict]: @dataclass class KinesisFirehoseResponseRecord: - """Record in Kinesis Data Firehose event + """Record in Kinesis Data Firehose response object Documentation: -------------- @@ -40,7 +40,10 @@ class KinesisFirehoseResponseRecord: """data blob, base64-encoded, optional at init. Allows pass in base64-encoded data directly or use either function like `data_from_text`, `data_from_json` to populate data""" data: Optional[str] = None - """Optional: metadata associated with this record; present only when Kinesis Stream is source""" + """ + Optional: Metadata associated with this record; can contain partition keys + - https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html + """ metadata: Optional[KinesisFirehoseResponseRecordMetadata] = None """Json data for caching json.dump result""" _json_data: Optional[Any] = None @@ -95,11 +98,17 @@ def data_as_json(self) -> Optional[Dict]: @dataclass class KinesisFirehoseResponse: - """Kinesis Data Firehose event + """Kinesis Data Firehose response object Documentation: -------------- - https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html + + Parameters + ---------- + records : List[KinesisFirehoseResponseRecord] + records of Kinesis Data Firehose response object, + optional parameter at start. can be added later using `add_record` function. """ records: Optional[List[KinesisFirehoseResponseRecord]] = None @@ -113,7 +122,7 @@ def add_record(self, record: KinesisFirehoseResponseRecord): def asdict(self) -> Dict: # make sure return size is less than 6MB if not self.records: - return {} + raise ValueError("Kinesis Firehose doesn't accept empyt response") return {"records": [r.asdict() for r in self.records]} @@ -195,6 +204,7 @@ def create_firehose_response_record( self, result: Literal["Ok", "Dropped", "ProcessingFailed"], data: Optional[str] = None, + metadata: Optional[KinesisFirehoseResponseRecordMetadata] = None, ) -> KinesisFirehoseResponseRecord: """create a KinesisFirehoseResponseRecord directly using the record_id and given values Parameters @@ -204,9 +214,11 @@ def create_firehose_response_record( data : str, optional data blob, base64-encoded, optional at init. Allows pass in base64-encoded data directly or use either function like `data_from_text`, `data_from_json` to populate data - + metadata: KinesisFirehoseResponseRecordMetadata, optional + Metadata associated with this record; can contain partition keys + - https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html """ - return KinesisFirehoseResponseRecord(record_id=self.record_id, result=result, data=data) + return KinesisFirehoseResponseRecord(record_id=self.record_id, result=result, data=data, metadata=metadata) class KinesisFirehoseEvent(DictWrapper): diff --git a/tests/unit/data_classes/test_kinesis_firehose_response.py b/tests/unit/data_classes/test_kinesis_firehose_response.py index ea56a4dcf9..0227e00369 100644 --- a/tests/unit/data_classes/test_kinesis_firehose_response.py +++ b/tests/unit/data_classes/test_kinesis_firehose_response.py @@ -47,9 +47,10 @@ def test_kinesis_firehose_create_response(): for record in parsed_event.records: # if data was delivered as json; caches loaded value data = record.data_as_text - + metadata_partition = KinesisFirehoseResponseRecordMetadata(partition_keys={"year": 2023}) processed_record = record.create_firehose_response_record( result="Ok", + metadata=metadata_partition, ) processed_record.data_from_text(data=data) response.add_record(record=processed_record) @@ -62,6 +63,7 @@ def test_kinesis_firehose_create_response(): assert record_01["result"] == "Ok" assert record_01["recordId"] == record01_raw["recordId"] assert record_01["data"] == record01_raw["data"] + assert record_01["metadata"]["partitionKeys"]["year"] == 2023 assert response.records[0].data_as_bytes == b"Hello World" assert response.records[0].data_as_text == "Hello World" From af1abfe5815eca2f1a8e6183561ede6064e1f0cb Mon Sep 17 00:00:00 2001 From: RogerZhang Date: Wed, 6 Sep 2023 17:03:18 +0000 Subject: [PATCH 09/32] remove 6MB comment --- .../utilities/data_classes/kinesis_firehose_event.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py index 81a234a13c..af9143970f 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py @@ -120,9 +120,8 @@ def add_record(self, record: KinesisFirehoseResponseRecord): self.records = [record] def asdict(self) -> Dict: - # make sure return size is less than 6MB if not self.records: - raise ValueError("Kinesis Firehose doesn't accept empyt response") + raise ValueError("Kinesis Firehose doesn't accept empty response") return {"records": [r.asdict() for r in self.records]} From 312830b13d34a93c0a6e75ea0f9b503d72f9bece Mon Sep 17 00:00:00 2001 From: RogerZhang Date: Thu, 7 Sep 2023 16:14:07 +0000 Subject: [PATCH 10/32] fix comments --- .../data_classes/kinesis_firehose_event.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py index af9143970f..7281e55b95 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py @@ -33,19 +33,16 @@ class KinesisFirehoseResponseRecord: - https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html """ - """Record ID; uniquely identifies this record within the current batch""" + # Record ID; uniquely identifies this record within the current batch""" record_id: str - """processing result, supported value: Ok, Dropped, ProcessingFailed""" + # Processing result, supported value: Ok, Dropped, ProcessingFailed""" result: Literal["Ok", "Dropped", "ProcessingFailed"] - """data blob, base64-encoded, optional at init. Allows pass in base64-encoded data directly or - use either function like `data_from_text`, `data_from_json` to populate data""" + # data blob, base64-encoded, optional at init. Allows pass in base64-encoded data directly or + # use either function like `data_from_text`, `data_from_json` to populate data""" data: Optional[str] = None - """ - Optional: Metadata associated with this record; can contain partition keys - - https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html - """ + # Optional: Metadata associated with this record; can contain partition keys + # See - https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html metadata: Optional[KinesisFirehoseResponseRecordMetadata] = None - """Json data for caching json.dump result""" _json_data: Optional[Any] = None json_serializer: Callable = json.dumps json_deserializer: Callable = json.loads From a3ed9f95c056733f17e02704a463c745e20b6e84 Mon Sep 17 00:00:00 2001 From: RogerZhang Date: Thu, 7 Sep 2023 19:01:04 +0000 Subject: [PATCH 11/32] address Heitor's suggestion --- .../utilities/data_classes/__init__.py | 12 ++-- .../data_classes/kinesis_firehose_event.py | 56 ++++++++++--------- .../src/kinesis_firehose_delivery_stream.py | 6 +- .../src/kinesis_firehose_response.py | 8 +-- .../test_kinesis_firehose_response.py | 18 +++--- 5 files changed, 51 insertions(+), 49 deletions(-) diff --git a/aws_lambda_powertools/utilities/data_classes/__init__.py b/aws_lambda_powertools/utilities/data_classes/__init__.py index 549d7a4a15..6144f30613 100644 --- a/aws_lambda_powertools/utilities/data_classes/__init__.py +++ b/aws_lambda_powertools/utilities/data_classes/__init__.py @@ -15,10 +15,10 @@ from .event_source import event_source from .kafka_event import KafkaEvent from .kinesis_firehose_event import ( + KinesisFirehoseDataTransformationRecord, + KinesisFirehoseDataTransformationRecordMetadata, + KinesisFirehoseDataTransformationResponse, KinesisFirehoseEvent, - KinesisFirehoseResponse, - KinesisFirehoseResponseRecord, - KinesisFirehoseResponseRecordMetadata, ) from .kinesis_stream_event import KinesisStreamEvent from .lambda_function_url_event import LambdaFunctionUrlEvent @@ -42,9 +42,9 @@ "KafkaEvent", "KinesisFirehoseEvent", "KinesisStreamEvent", - "KinesisFirehoseResponse", - "KinesisFirehoseResponseRecord", - "KinesisFirehoseResponseRecordMetadata", + "KinesisFirehoseDataTransformationResponse", + "KinesisFirehoseDataTransformationRecord", + "KinesisFirehoseDataTransformationRecordMetadata", "LambdaFunctionUrlEvent", "S3Event", "S3EventBridgeNotificationEvent", diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py index 7281e55b95..1b472494a6 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py @@ -1,6 +1,6 @@ import base64 import json -from dataclasses import dataclass +from dataclasses import dataclass, field from typing import Any, Callable, Dict, Iterator, List, Optional from typing_extensions import Literal @@ -9,7 +9,7 @@ @dataclass -class KinesisFirehoseResponseRecordMetadata: +class KinesisFirehoseDataTransformationRecordMetadata: """ Documentation: -------------- @@ -18,14 +18,14 @@ class KinesisFirehoseResponseRecordMetadata: partition_keys: Optional[Dict[str, str]] - def asdict(self) -> Optional[Dict]: + def asdict(self) -> Dict: if self.partition_keys is not None: return {"partitionKeys": self.partition_keys} - return None + return {} @dataclass -class KinesisFirehoseResponseRecord: +class KinesisFirehoseDataTransformationRecord: """Record in Kinesis Data Firehose response object Documentation: @@ -36,13 +36,13 @@ class KinesisFirehoseResponseRecord: # Record ID; uniquely identifies this record within the current batch""" record_id: str # Processing result, supported value: Ok, Dropped, ProcessingFailed""" - result: Literal["Ok", "Dropped", "ProcessingFailed"] + result: Literal["Ok", "Dropped", "ProcessingFailed"] = "Ok" # data blob, base64-encoded, optional at init. Allows pass in base64-encoded data directly or # use either function like `data_from_text`, `data_from_json` to populate data""" data: Optional[str] = None # Optional: Metadata associated with this record; can contain partition keys # See - https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html - metadata: Optional[KinesisFirehoseResponseRecordMetadata] = None + metadata: Optional[KinesisFirehoseDataTransformationRecordMetadata] = None _json_data: Optional[Any] = None json_serializer: Callable = json.dumps json_deserializer: Callable = json.loads @@ -70,31 +70,31 @@ def asdict(self) -> Dict: return r @property - def data_as_bytes(self) -> Optional[bytes]: + def data_as_bytes(self) -> bytes: """Decoded base64-encoded data as bytes""" if not self.data: - return None + return b"" return base64.b64decode(self.data) @property - def data_as_text(self) -> Optional[str]: + def data_as_text(self) -> str: """Decoded base64-encoded data as text""" - if not self.data_as_bytes: - return None + if not self.data: + return "" return self.data_as_bytes.decode("utf-8") @property - def data_as_json(self) -> Optional[Dict]: + def data_as_json(self) -> Dict: """Decoded base64-encoded data loaded to json""" - if not self.data_as_text: - return None + if not self.data: + return {} if self._json_data is None: self._json_data = self.json_deserializer(self.data_as_text) return self._json_data @dataclass -class KinesisFirehoseResponse: +class KinesisFirehoseDataTransformationResponse: """Kinesis Data Firehose response object Documentation: @@ -108,13 +108,10 @@ class KinesisFirehoseResponse: optional parameter at start. can be added later using `add_record` function. """ - records: Optional[List[KinesisFirehoseResponseRecord]] = None + records: List[KinesisFirehoseDataTransformationRecord] = field(default_factory=list) - def add_record(self, record: KinesisFirehoseResponseRecord): - if self.records: - self.records.append(record) - else: - self.records = [record] + def add_record(self, record: KinesisFirehoseDataTransformationRecord): + self.records.append(record) def asdict(self) -> Dict: if not self.records: @@ -196,12 +193,12 @@ def data_as_json(self) -> dict: self._json_data = self._json_deserializer(self.data_as_text) return self._json_data - def create_firehose_response_record( + def build_data_transformation_response( self, - result: Literal["Ok", "Dropped", "ProcessingFailed"], + result: Literal["Ok", "Dropped", "ProcessingFailed"] = "Ok", data: Optional[str] = None, - metadata: Optional[KinesisFirehoseResponseRecordMetadata] = None, - ) -> KinesisFirehoseResponseRecord: + metadata: Optional[KinesisFirehoseDataTransformationRecordMetadata] = None, + ) -> KinesisFirehoseDataTransformationRecord: """create a KinesisFirehoseResponseRecord directly using the record_id and given values Parameters ---------- @@ -214,7 +211,12 @@ def create_firehose_response_record( Metadata associated with this record; can contain partition keys - https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html """ - return KinesisFirehoseResponseRecord(record_id=self.record_id, result=result, data=data, metadata=metadata) + return KinesisFirehoseDataTransformationRecord( + record_id=self.record_id, + result=result, + data=data, + metadata=metadata, + ) class KinesisFirehoseEvent(DictWrapper): diff --git a/examples/event_sources/src/kinesis_firehose_delivery_stream.py b/examples/event_sources/src/kinesis_firehose_delivery_stream.py index 5b5805548d..97e5ce4036 100644 --- a/examples/event_sources/src/kinesis_firehose_delivery_stream.py +++ b/examples/event_sources/src/kinesis_firehose_delivery_stream.py @@ -1,6 +1,6 @@ from aws_lambda_powertools.utilities.data_classes import ( + KinesisFirehoseDataTransformationResponse, KinesisFirehoseEvent, - KinesisFirehoseResponse, event_source, ) from aws_lambda_powertools.utilities.typing import LambdaContext @@ -8,7 +8,7 @@ @event_source(data_class=KinesisFirehoseEvent) def lambda_handler(event: KinesisFirehoseEvent, context: LambdaContext): - result = KinesisFirehoseResponse() + result = KinesisFirehoseDataTransformationResponse() for record in event.records: # if data was delivered as json; caches loaded value @@ -18,7 +18,7 @@ def lambda_handler(event: KinesisFirehoseEvent, context: LambdaContext): ## generate data to return new_data = {"tool_used": "powertools_dataclass", "original_payload": data} - processed_record = record.create_firehose_response_record(result="Ok") + processed_record = record.build_data_transformation_response(result="Ok") processed_record.data_from_json(data=new_data) result.add_record(processed_record) diff --git a/examples/event_sources/src/kinesis_firehose_response.py b/examples/event_sources/src/kinesis_firehose_response.py index 8817439aef..b3725ca935 100644 --- a/examples/event_sources/src/kinesis_firehose_response.py +++ b/examples/event_sources/src/kinesis_firehose_response.py @@ -1,13 +1,13 @@ import base64 from aws_lambda_powertools.utilities.data_classes import ( - KinesisFirehoseResponse, - KinesisFirehoseResponseRecord, + KinesisFirehoseDataTransformationRecord, + KinesisFirehoseDataTransformationResponse, ) def lambda_handler(event, context): - result = KinesisFirehoseResponse() + result = KinesisFirehoseDataTransformationResponse() for record in event["records"]: print(record["recordId"]) @@ -16,7 +16,7 @@ def lambda_handler(event, context): ## generate data to return new_data = {"tool_used": "powertools_dataclass", "original_payload": payload} - processed_record = KinesisFirehoseResponseRecord(record_id=record["recordId"], result="Ok") + processed_record = KinesisFirehoseDataTransformationRecord(record_id=record["recordId"], result="Ok") processed_record.data_from_text(data=new_data) result.add_record(processed_record) diff --git a/tests/unit/data_classes/test_kinesis_firehose_response.py b/tests/unit/data_classes/test_kinesis_firehose_response.py index 0227e00369..dced3de4a3 100644 --- a/tests/unit/data_classes/test_kinesis_firehose_response.py +++ b/tests/unit/data_classes/test_kinesis_firehose_response.py @@ -1,8 +1,8 @@ from aws_lambda_powertools.utilities.data_classes import ( + KinesisFirehoseDataTransformationRecord, + KinesisFirehoseDataTransformationRecordMetadata, + KinesisFirehoseDataTransformationResponse, KinesisFirehoseEvent, - KinesisFirehoseResponse, - KinesisFirehoseResponseRecord, - KinesisFirehoseResponseRecordMetadata, ) from tests.functional.utils import load_event @@ -11,13 +11,13 @@ def test_kinesis_firehose_response(): raw_event = load_event("kinesisFirehoseKinesisEvent.json") parsed_event = KinesisFirehoseEvent(data=raw_event) - response = KinesisFirehoseResponse() + response = KinesisFirehoseDataTransformationResponse() for record in parsed_event.records: # if data was delivered as json; caches loaded value data = record.data_as_text - metadata_partition = KinesisFirehoseResponseRecordMetadata(partition_keys={"year": 2023}) - processed_record = KinesisFirehoseResponseRecord( + metadata_partition = KinesisFirehoseDataTransformationRecordMetadata(partition_keys={"year": 2023}) + processed_record = KinesisFirehoseDataTransformationRecord( record_id=record.record_id, result="Ok", metadata=metadata_partition, @@ -43,12 +43,12 @@ def test_kinesis_firehose_create_response(): raw_event = load_event("kinesisFirehoseKinesisEvent.json") parsed_event = KinesisFirehoseEvent(data=raw_event) - response = KinesisFirehoseResponse() + response = KinesisFirehoseDataTransformationResponse() for record in parsed_event.records: # if data was delivered as json; caches loaded value data = record.data_as_text - metadata_partition = KinesisFirehoseResponseRecordMetadata(partition_keys={"year": 2023}) - processed_record = record.create_firehose_response_record( + metadata_partition = KinesisFirehoseDataTransformationRecordMetadata(partition_keys={"year": 2023}) + processed_record = record.build_data_transformation_response( result="Ok", metadata=metadata_partition, ) From bfbee603f0fab6d9b07f05d350a540bb600e96df Mon Sep 17 00:00:00 2001 From: RogerZhang Date: Thu, 7 Sep 2023 19:08:41 +0000 Subject: [PATCH 12/32] data class default optimization --- .../utilities/data_classes/kinesis_firehose_event.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py index 1b472494a6..0251ee59dd 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py @@ -8,7 +8,7 @@ from aws_lambda_powertools.utilities.data_classes.common import DictWrapper -@dataclass +@dataclass(repr=False, order=False, slots=True, frozen=True) class KinesisFirehoseDataTransformationRecordMetadata: """ Documentation: @@ -24,7 +24,7 @@ def asdict(self) -> Dict: return {} -@dataclass +@dataclass(repr=False, order=False, slots=True) class KinesisFirehoseDataTransformationRecord: """Record in Kinesis Data Firehose response object @@ -93,7 +93,7 @@ def data_as_json(self) -> Dict: return self._json_data -@dataclass +@dataclass(repr=False, order=False, slots=True) class KinesisFirehoseDataTransformationResponse: """Kinesis Data Firehose response object From 4016446050709eec95a114b965e508dcfe215221 Mon Sep 17 00:00:00 2001 From: RogerZhang Date: Thu, 7 Sep 2023 19:30:44 +0000 Subject: [PATCH 13/32] remove slot for static check --- .../utilities/data_classes/kinesis_firehose_event.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py index 0251ee59dd..cf551edf85 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py @@ -8,7 +8,7 @@ from aws_lambda_powertools.utilities.data_classes.common import DictWrapper -@dataclass(repr=False, order=False, slots=True, frozen=True) +@dataclass(repr=False, order=False, frozen=True) class KinesisFirehoseDataTransformationRecordMetadata: """ Documentation: @@ -24,7 +24,7 @@ def asdict(self) -> Dict: return {} -@dataclass(repr=False, order=False, slots=True) +@dataclass(repr=False, order=False) class KinesisFirehoseDataTransformationRecord: """Record in Kinesis Data Firehose response object @@ -93,7 +93,7 @@ def data_as_json(self) -> Dict: return self._json_data -@dataclass(repr=False, order=False, slots=True) +@dataclass(repr=False, order=False) class KinesisFirehoseDataTransformationResponse: """Kinesis Data Firehose response object From 5dbb3ff05709cd17a59db8ea48999771d6af0a0d Mon Sep 17 00:00:00 2001 From: RogerZhang Date: Thu, 7 Sep 2023 19:50:57 +0000 Subject: [PATCH 14/32] fix doc, example --- docs/utilities/data_classes.md | 2 +- examples/event_sources/src/kinesis_firehose_response.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/utilities/data_classes.md b/docs/utilities/data_classes.md index 5cf98d1205..7d86cca26e 100644 --- a/docs/utilities/data_classes.md +++ b/docs/utilities/data_classes.md @@ -981,7 +981,7 @@ inline, and re-emit them back to the Delivery Stream. Similar to Kinesis Data Streams, the events contain base64 encoded data. You can use the helper function to access the data either as json or plain text, depending on the original payload. -When constructing response to Firehose, You can utilize the `KinesisFirehoseResponse` class shown +When constructing response to Firehose, You can utilize the `KinesisFirehoseDataTransformationResponse` class shown in the example below. === "app.py" diff --git a/examples/event_sources/src/kinesis_firehose_response.py b/examples/event_sources/src/kinesis_firehose_response.py index b3725ca935..8039dade45 100644 --- a/examples/event_sources/src/kinesis_firehose_response.py +++ b/examples/event_sources/src/kinesis_firehose_response.py @@ -17,7 +17,7 @@ def lambda_handler(event, context): new_data = {"tool_used": "powertools_dataclass", "original_payload": payload} processed_record = KinesisFirehoseDataTransformationRecord(record_id=record["recordId"], result="Ok") - processed_record.data_from_text(data=new_data) + processed_record.data_from_json(data=new_data) result.add_record(processed_record) # return transformed records From e4d75d768772f959745d467bd37945f0fd7957e7 Mon Sep 17 00:00:00 2001 From: RogerZhang Date: Mon, 11 Sep 2023 21:32:05 +0000 Subject: [PATCH 15/32] rename r->record --- .../utilities/data_classes/kinesis_firehose_event.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py index cf551edf85..5a1affa233 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py @@ -60,14 +60,14 @@ def data_from_json(self, data: Any): self.data_from_text(data=self.json_serializer(data)) def asdict(self) -> Dict: - r: Dict[str, Any] = { + record: Dict[str, Any] = { "recordId": self.record_id, "result": self.result, "data": self.data, } if self.metadata: - r["metadata"] = self.metadata.asdict() - return r + record["metadata"] = self.metadata.asdict() + return record @property def data_as_bytes(self) -> bytes: @@ -117,7 +117,7 @@ def asdict(self) -> Dict: if not self.records: raise ValueError("Kinesis Firehose doesn't accept empty response") - return {"records": [r.asdict() for r in self.records]} + return {"records": [record.asdict() for record in self.records]} class KinesisFirehoseRecordMetadata(DictWrapper): From ce6ed612de88a1e71dbd63a57fe5d2feb7c510b6 Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Tue, 12 Sep 2023 16:08:11 +0100 Subject: [PATCH 16/32] Addressing Heitor's feedback --- aws_lambda_powertools/shared/__init__.py | 1 + .../data_classes/kinesis_firehose_event.py | 56 +++++++++---------- .../utilities/serialization.py | 43 ++++++++++++++ .../src/kinesis_firehose_response.py | 28 ++++++---- .../test_kinesis_firehose_response.py | 34 +++++------ 5 files changed, 106 insertions(+), 56 deletions(-) create mode 100644 aws_lambda_powertools/utilities/serialization.py diff --git a/aws_lambda_powertools/shared/__init__.py b/aws_lambda_powertools/shared/__init__.py index e69de29bb2..d68e37349b 100644 --- a/aws_lambda_powertools/shared/__init__.py +++ b/aws_lambda_powertools/shared/__init__.py @@ -0,0 +1 @@ +"""Internal shared functions. Do not rely on it besides internal usage.""" diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py index 5a1affa233..19dc22bcb1 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py @@ -16,48 +16,46 @@ class KinesisFirehoseDataTransformationRecordMetadata: - https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html """ - partition_keys: Optional[Dict[str, str]] - - def asdict(self) -> Dict: - if self.partition_keys is not None: - return {"partitionKeys": self.partition_keys} - return {} + partition_keys: Dict[str, Any] = field(default_factory=lambda: {}) @dataclass(repr=False, order=False) class KinesisFirehoseDataTransformationRecord: - """Record in Kinesis Data Firehose response object + """Record in Kinesis Data Firehose response object. + + Parameters + ---------- + record_id: str + uniquely identifies this record within the current batch + result: Literal["Ok", "Dropped", "ProcessingFailed"] + record data transformation status, whether it succeeded, should be dropped, or failed. + data: str + base64-encoded payload, by default empty string. + + Use `data_from_text` or `data_from_json` methods to convert data if needed. + + metadata: Optional[KinesisFirehoseDataTransformationRecordMetadata] + Metadata associated with this record; can contain partition keys. + + See: https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html + json_serializer: Callable + function to serialize `obj` to a JSON formatted `str`, by default json.dumps + json_deserializer: Callable + function to deserialize `str`, `bytes`, bytearray` containing a JSON document to a Python `obj`, + by default json.loads Documentation: -------------- - https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html """ - # Record ID; uniquely identifies this record within the current batch""" record_id: str - # Processing result, supported value: Ok, Dropped, ProcessingFailed""" result: Literal["Ok", "Dropped", "ProcessingFailed"] = "Ok" - # data blob, base64-encoded, optional at init. Allows pass in base64-encoded data directly or - # use either function like `data_from_text`, `data_from_json` to populate data""" - data: Optional[str] = None - # Optional: Metadata associated with this record; can contain partition keys - # See - https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html + data: str = "" metadata: Optional[KinesisFirehoseDataTransformationRecordMetadata] = None - _json_data: Optional[Any] = None json_serializer: Callable = json.dumps json_deserializer: Callable = json.loads - - def data_from_byte(self, data: bytes): - """Populate data field using a byte like data""" - self.data = base64.b64encode(data).decode("utf-8") - - def data_from_text(self, data: str): - """Populate data field using a string like data""" - self.data_from_byte(data.encode("utf-8")) - - def data_from_json(self, data: Any): - """Populate data field using any structure that could be converted to json""" - self.data_from_text(data=self.json_serializer(data)) + _json_data: Optional[Any] = None def asdict(self) -> Dict: record: Dict[str, Any] = { @@ -66,7 +64,7 @@ def asdict(self) -> Dict: "data": self.data, } if self.metadata: - record["metadata"] = self.metadata.asdict() + record["metadata"] = self.metadata.__dict__ return record @property @@ -196,7 +194,7 @@ def data_as_json(self) -> dict: def build_data_transformation_response( self, result: Literal["Ok", "Dropped", "ProcessingFailed"] = "Ok", - data: Optional[str] = None, + data: str = "", metadata: Optional[KinesisFirehoseDataTransformationRecordMetadata] = None, ) -> KinesisFirehoseDataTransformationRecord: """create a KinesisFirehoseResponseRecord directly using the record_id and given values diff --git a/aws_lambda_powertools/utilities/serialization.py b/aws_lambda_powertools/utilities/serialization.py new file mode 100644 index 0000000000..1242ab77f4 --- /dev/null +++ b/aws_lambda_powertools/utilities/serialization.py @@ -0,0 +1,43 @@ +"""Standalone functions to serialize/deserialize common data structures""" +import base64 +import json +from typing import Any, Callable + + +def base64_decode(data: str) -> str: + """Decodes a Base64-encoded string and returns the decoded value. + + Parameters + ---------- + data: str + The Base64-encoded string to decode. + + Returns + ------- + str + The decoded string value. + """ + return base64.b64decode(data).decode("utf-8") + + +def base64_from_str(data: str) -> str: + """Encode str as base64 string""" + return base64.b64encode(data.encode()).decode("utf-8") + + +def base64_from_json(data: Any, json_serializer: Callable[..., str] = json.dumps) -> str: + """Encode JSON serializable data as base64 string + + Parameters + ---------- + data: Any + JSON serializable (dict, list, boolean, etc.) + json_serializer: Callable + function to serialize `obj` to a JSON formatted `str`, by default json.dumps + + Returns + ------- + str: + JSON string as base64 string + """ + return base64_from_str(data=json_serializer(data)) diff --git a/examples/event_sources/src/kinesis_firehose_response.py b/examples/event_sources/src/kinesis_firehose_response.py index 8039dade45..ef60f4cabf 100644 --- a/examples/event_sources/src/kinesis_firehose_response.py +++ b/examples/event_sources/src/kinesis_firehose_response.py @@ -1,23 +1,29 @@ -import base64 - from aws_lambda_powertools.utilities.data_classes import ( - KinesisFirehoseDataTransformationRecord, KinesisFirehoseDataTransformationResponse, + KinesisFirehoseEvent, + event_source, ) +from aws_lambda_powertools.utilities.serialization import base64_from_json +from aws_lambda_powertools.utilities.typing import LambdaContext -def lambda_handler(event, context): +@event_source(data_class=KinesisFirehoseEvent) +def lambda_handler(event: KinesisFirehoseEvent, context: LambdaContext): result = KinesisFirehoseDataTransformationResponse() - for record in event["records"]: - print(record["recordId"]) - payload = base64.b64decode(record["data"]).decode("utf-8") - ## do all kind of stuff with payload + for record in event.records: + # get original data using data_as_text property + data = record.data_as_text + + ## do all kind of stuff with data ## generate data to return - new_data = {"tool_used": "powertools_dataclass", "original_payload": payload} + transformed_data = {"new_data": "transformed data using Powertools", "original_payload": data} + + processed_record = record.build_data_transformation_response( + result="Ok", + data=base64_from_json(transformed_data), + ) - processed_record = KinesisFirehoseDataTransformationRecord(record_id=record["recordId"], result="Ok") - processed_record.data_from_json(data=new_data) result.add_record(processed_record) # return transformed records diff --git a/tests/unit/data_classes/test_kinesis_firehose_response.py b/tests/unit/data_classes/test_kinesis_firehose_response.py index dced3de4a3..abf730ea42 100644 --- a/tests/unit/data_classes/test_kinesis_firehose_response.py +++ b/tests/unit/data_classes/test_kinesis_firehose_response.py @@ -8,35 +8,37 @@ def test_kinesis_firehose_response(): + # GIVEN a Kinesis Firehose Event with two records raw_event = load_event("kinesisFirehoseKinesisEvent.json") parsed_event = KinesisFirehoseEvent(data=raw_event) + # WHEN we create a Data Transformation Response without changing the data response = KinesisFirehoseDataTransformationResponse() for record in parsed_event.records: - # if data was delivered as json; caches loaded value - data = record.data_as_text - metadata_partition = KinesisFirehoseDataTransformationRecordMetadata(partition_keys={"year": 2023}) processed_record = KinesisFirehoseDataTransformationRecord( record_id=record.record_id, - result="Ok", metadata=metadata_partition, + data=record.data, ) - processed_record.data_from_text(data=data) response.add_record(record=processed_record) - response_dict = response.asdict() - res_records = list(response_dict["records"]) - assert len(res_records) == 2 - record_01, record_02 = res_records[:] - record01_raw = raw_event["records"][0] - assert record_01["result"] == "Ok" - assert record_01["recordId"] == record01_raw["recordId"] - assert record_01["data"] == record01_raw["data"] - assert record_01["metadata"]["partitionKeys"]["year"] == 2023 + # THEN we should have the same record data + record_01, record_02 = response.records[0], response.records[1] + raw_record_01, raw_record_02 = raw_event["records"][0], raw_event["records"][1] - assert response.records[0].data_as_bytes == b"Hello World" - assert response.records[0].data_as_text == "Hello World" + assert len(response.records) == 2 + + assert record_01.result == "Ok" + assert record_02.result == "Ok" + + assert record_01.record_id == raw_record_01["recordId"] + assert record_02.record_id == raw_record_02["recordId"] + + assert record_01.data == raw_record_01["data"] + assert record_02.data == raw_record_02["data"] + + assert record_01.metadata.partition_keys["year"] == 2023 def test_kinesis_firehose_create_response(): From d8be53e090665eda2eb3774410b2566c9e3784da Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Tue, 12 Sep 2023 16:12:39 +0100 Subject: [PATCH 17/32] Addressing Heitor's feedback --- .../src/kinesis_firehose_delivery_stream.py | 13 +++++++---- .../src/kinesis_firehose_response.py | 23 +++++++++---------- 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/examples/event_sources/src/kinesis_firehose_delivery_stream.py b/examples/event_sources/src/kinesis_firehose_delivery_stream.py index 97e5ce4036..ef60f4cabf 100644 --- a/examples/event_sources/src/kinesis_firehose_delivery_stream.py +++ b/examples/event_sources/src/kinesis_firehose_delivery_stream.py @@ -3,6 +3,7 @@ KinesisFirehoseEvent, event_source, ) +from aws_lambda_powertools.utilities.serialization import base64_from_json from aws_lambda_powertools.utilities.typing import LambdaContext @@ -11,15 +12,17 @@ def lambda_handler(event: KinesisFirehoseEvent, context: LambdaContext): result = KinesisFirehoseDataTransformationResponse() for record in event.records: - # if data was delivered as json; caches loaded value - data = record.data_as_json + # get original data using data_as_text property + data = record.data_as_text ## do all kind of stuff with data ## generate data to return - new_data = {"tool_used": "powertools_dataclass", "original_payload": data} + transformed_data = {"new_data": "transformed data using Powertools", "original_payload": data} - processed_record = record.build_data_transformation_response(result="Ok") - processed_record.data_from_json(data=new_data) + processed_record = record.build_data_transformation_response( + result="Ok", + data=base64_from_json(transformed_data), + ) result.add_record(processed_record) diff --git a/examples/event_sources/src/kinesis_firehose_response.py b/examples/event_sources/src/kinesis_firehose_response.py index ef60f4cabf..093a6f378e 100644 --- a/examples/event_sources/src/kinesis_firehose_response.py +++ b/examples/event_sources/src/kinesis_firehose_response.py @@ -1,29 +1,28 @@ +import base64 + from aws_lambda_powertools.utilities.data_classes import ( + KinesisFirehoseDataTransformationRecord, KinesisFirehoseDataTransformationResponse, - KinesisFirehoseEvent, - event_source, ) from aws_lambda_powertools.utilities.serialization import base64_from_json from aws_lambda_powertools.utilities.typing import LambdaContext -@event_source(data_class=KinesisFirehoseEvent) -def lambda_handler(event: KinesisFirehoseEvent, context: LambdaContext): +def lambda_handler(event: dict, context: LambdaContext): result = KinesisFirehoseDataTransformationResponse() - for record in event.records: - # get original data using data_as_text property - data = record.data_as_text - - ## do all kind of stuff with data + for record in event["records"]: + print(record["recordId"]) + payload = base64.b64decode(record["data"]).decode("utf-8") + ## do all kind of stuff with payload ## generate data to return - transformed_data = {"new_data": "transformed data using Powertools", "original_payload": data} + transformed_data = {"tool_used": "powertools_dataclass", "original_payload": payload} - processed_record = record.build_data_transformation_response( + processed_record = KinesisFirehoseDataTransformationRecord( + record_id=record["recordId"], result="Ok", data=base64_from_json(transformed_data), ) - result.add_record(processed_record) # return transformed records From 3a1156382160d2eb1b109ce17bca8c66861b4783 Mon Sep 17 00:00:00 2001 From: Leandro Damascena Date: Tue, 12 Sep 2023 16:24:29 +0100 Subject: [PATCH 18/32] Addressing Heitor's feedback --- .../utilities/serialization.py | 16 ++++++++ .../test_kinesis_firehose_response.py | 41 +++++++++++-------- 2 files changed, 41 insertions(+), 16 deletions(-) diff --git a/aws_lambda_powertools/utilities/serialization.py b/aws_lambda_powertools/utilities/serialization.py index 1242ab77f4..ef76eec70e 100644 --- a/aws_lambda_powertools/utilities/serialization.py +++ b/aws_lambda_powertools/utilities/serialization.py @@ -4,6 +4,22 @@ from typing import Any, Callable +def base64_encode(data: str) -> str: + """Encode a string and returns Base64-encoded encoded value. + + Parameters + ---------- + data: str + The string to encode. + + Returns + ------- + str + The Base64-encoded encoded value. + """ + return base64.b64encode(data.encode()).decode("utf-8") + + def base64_decode(data: str) -> str: """Decodes a Base64-encoded string and returns the decoded value. diff --git a/tests/unit/data_classes/test_kinesis_firehose_response.py b/tests/unit/data_classes/test_kinesis_firehose_response.py index abf730ea42..95b3bd6aa5 100644 --- a/tests/unit/data_classes/test_kinesis_firehose_response.py +++ b/tests/unit/data_classes/test_kinesis_firehose_response.py @@ -4,6 +4,7 @@ KinesisFirehoseDataTransformationResponse, KinesisFirehoseEvent, ) +from aws_lambda_powertools.utilities.serialization import base64_encode, base64_from_str from tests.functional.utils import load_event @@ -42,30 +43,38 @@ def test_kinesis_firehose_response(): def test_kinesis_firehose_create_response(): + # GIVEN a Kinesis Firehose Event with two records raw_event = load_event("kinesisFirehoseKinesisEvent.json") parsed_event = KinesisFirehoseEvent(data=raw_event) + # WHEN we create a Data Transformation Response changing the data + # WHEN we add partitions keys + + arbitrary_data = "arbitrary data" + response = KinesisFirehoseDataTransformationResponse() for record in parsed_event.records: - # if data was delivered as json; caches loaded value - data = record.data_as_text metadata_partition = KinesisFirehoseDataTransformationRecordMetadata(partition_keys={"year": 2023}) processed_record = record.build_data_transformation_response( result="Ok", metadata=metadata_partition, + data=base64_from_str(arbitrary_data), ) - processed_record.data_from_text(data=data) response.add_record(record=processed_record) - response_dict = response.asdict() - - res_records = list(response_dict["records"]) - assert len(res_records) == 2 - record_01, record_02 = res_records[:] - record01_raw = raw_event["records"][0] - assert record_01["result"] == "Ok" - assert record_01["recordId"] == record01_raw["recordId"] - assert record_01["data"] == record01_raw["data"] - assert record_01["metadata"]["partitionKeys"]["year"] == 2023 - - assert response.records[0].data_as_bytes == b"Hello World" - assert response.records[0].data_as_text == "Hello World" + + # THEN we should have the same record data + record_01, record_02 = response.records[0], response.records[1] + raw_record_01, raw_record_02 = raw_event["records"][0], raw_event["records"][1] + + assert len(response.records) == 2 + + assert record_01.result == "Ok" + assert record_02.result == "Ok" + + assert record_01.record_id == raw_record_01["recordId"] + assert record_02.record_id == raw_record_02["recordId"] + + assert record_01.data == base64_encode(arbitrary_data) + assert record_02.data == base64_encode(arbitrary_data) + + assert record_01.metadata.partition_keys["year"] == 2023 From 17c676338a39644f641621164621093e5d8fed93 Mon Sep 17 00:00:00 2001 From: RogerZhang Date: Tue, 12 Sep 2023 22:13:25 +0000 Subject: [PATCH 19/32] add result warning, add asdict test, metadata test --- .../data_classes/kinesis_firehose_event.py | 23 +++++++++- .../test_kinesis_firehose_response.py | 43 ++++++++++++++++--- 2 files changed, 59 insertions(+), 7 deletions(-) diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py index 19dc22bcb1..62988ccbd6 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py @@ -1,5 +1,6 @@ import base64 import json +import warnings from dataclasses import dataclass, field from typing import Any, Callable, Dict, Iterator, List, Optional @@ -11,12 +12,24 @@ @dataclass(repr=False, order=False, frozen=True) class KinesisFirehoseDataTransformationRecordMetadata: """ + Metadata in Firehose Data Transform Record. + + Parameters + ---------- + partition_keys: Dict[str, str] + A dict of partition keys/value in string format, e.g. `{"year":"2023","month":"09"}` + Documentation: -------------- - https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html """ - partition_keys: Dict[str, Any] = field(default_factory=lambda: {}) + partition_keys: Dict[str, str] = field(default_factory=lambda: {}) + + def asdict(self) -> Dict: + if self.partition_keys is not None: + return {"partitionKeys": self.partition_keys} + return {} @dataclass(repr=False, order=False) @@ -58,13 +71,19 @@ class KinesisFirehoseDataTransformationRecord: _json_data: Optional[Any] = None def asdict(self) -> Dict: + if self.result not in ["Ok", "Dropped", "ProcessingFailed"]: + warnings.warn( + stacklevel=1, + message=f'The result "{self.result}" is not valid, Choose from "Ok", "Dropped", "ProcessingFailed"', + ) + record: Dict[str, Any] = { "recordId": self.record_id, "result": self.result, "data": self.data, } if self.metadata: - record["metadata"] = self.metadata.__dict__ + record["metadata"] = self.metadata.asdict() return record @property diff --git a/tests/unit/data_classes/test_kinesis_firehose_response.py b/tests/unit/data_classes/test_kinesis_firehose_response.py index 95b3bd6aa5..4a0263837b 100644 --- a/tests/unit/data_classes/test_kinesis_firehose_response.py +++ b/tests/unit/data_classes/test_kinesis_firehose_response.py @@ -8,6 +8,20 @@ from tests.functional.utils import load_event +def test_kinesis_firehose_response_metadata(): + # When we create metadata with partition keys and attach to a firehose response record + metadata_partition = KinesisFirehoseDataTransformationRecordMetadata(partition_keys={"year": "2023"}) + + processed_record = KinesisFirehoseDataTransformationRecord( + record_id="test_id", + metadata=metadata_partition, + data="", + ) + # Then we should have partition keys available in metadata field with same value + assert processed_record.metadata.partition_keys["year"] == "2023" + assert metadata_partition.asdict() == {"partitionKeys": {"year": "2023"}} + + def test_kinesis_firehose_response(): # GIVEN a Kinesis Firehose Event with two records raw_event = load_event("kinesisFirehoseKinesisEvent.json") @@ -16,10 +30,8 @@ def test_kinesis_firehose_response(): # WHEN we create a Data Transformation Response without changing the data response = KinesisFirehoseDataTransformationResponse() for record in parsed_event.records: - metadata_partition = KinesisFirehoseDataTransformationRecordMetadata(partition_keys={"year": 2023}) processed_record = KinesisFirehoseDataTransformationRecord( record_id=record.record_id, - metadata=metadata_partition, data=record.data, ) response.add_record(record=processed_record) @@ -39,7 +51,28 @@ def test_kinesis_firehose_response(): assert record_01.data == raw_record_01["data"] assert record_02.data == raw_record_02["data"] - assert record_01.metadata.partition_keys["year"] == 2023 + +def test_kinesis_firehose_response_asdict(): + # Given the following example response provided by Firehose + sample_response = { + "records": [ + {"recordId": "sample_record", "data": "", "result": "Ok", "metadata": {"partitionKeys": {"year": "2023"}}}, + ], + } + + # Then asdict function should be able to return the same value + response = KinesisFirehoseDataTransformationResponse() + metadata_partition = KinesisFirehoseDataTransformationRecordMetadata( + partition_keys=sample_response["records"][0]["metadata"]["partitionKeys"], + ) + processed_record = KinesisFirehoseDataTransformationRecord( + record_id=sample_response["records"][0]["recordId"], + data=sample_response["records"][0]["data"], + result=sample_response["records"][0]["result"], + metadata=metadata_partition, + ) + response.add_record(record=processed_record) + assert response.asdict() == sample_response def test_kinesis_firehose_create_response(): @@ -54,7 +87,7 @@ def test_kinesis_firehose_create_response(): response = KinesisFirehoseDataTransformationResponse() for record in parsed_event.records: - metadata_partition = KinesisFirehoseDataTransformationRecordMetadata(partition_keys={"year": 2023}) + metadata_partition = KinesisFirehoseDataTransformationRecordMetadata(partition_keys={"year": "2023"}) processed_record = record.build_data_transformation_response( result="Ok", metadata=metadata_partition, @@ -77,4 +110,4 @@ def test_kinesis_firehose_create_response(): assert record_01.data == base64_encode(arbitrary_data) assert record_02.data == base64_encode(arbitrary_data) - assert record_01.metadata.partition_keys["year"] == 2023 + assert record_01.metadata.partition_keys["year"] == "2023" From cad6c09b13c181648624c80ccde08f737a9a6f3f Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Tue, 12 Sep 2023 16:49:47 +0200 Subject: [PATCH 20/32] refactor: initial refactoring --- .../utilities/data_classes/kinesis_firehose_event.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py index 62988ccbd6..6098f0b0ae 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py @@ -83,7 +83,7 @@ def asdict(self) -> Dict: "data": self.data, } if self.metadata: - record["metadata"] = self.metadata.asdict() + record["metadata"] = self.metadata.__dict__ return record @property From 6809e0e6d0fa5e9f61d528d6931e8de14d0bfd34 Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Thu, 14 Sep 2023 21:38:56 +0200 Subject: [PATCH 21/32] chore: branding Signed-off-by: heitorlessa --- .../utilities/data_classes/kinesis_firehose_event.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py index 6098f0b0ae..4f451c805c 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py @@ -132,7 +132,7 @@ def add_record(self, record: KinesisFirehoseDataTransformationRecord): def asdict(self) -> Dict: if not self.records: - raise ValueError("Kinesis Firehose doesn't accept empty response") + raise ValueError("Amazon Kinesis Data Firehose doesn't accept empty response") return {"records": [record.asdict() for record in self.records]} @@ -216,7 +216,8 @@ def build_data_transformation_response( data: str = "", metadata: Optional[KinesisFirehoseDataTransformationRecordMetadata] = None, ) -> KinesisFirehoseDataTransformationRecord: - """create a KinesisFirehoseResponseRecord directly using the record_id and given values + """Create a KinesisFirehoseResponseRecord directly using the record_id and given values + Parameters ---------- result : Literal["Ok", "Dropped", "ProcessingFailed"] From 42a0de94a8d2ceeaef37b3449ebcb69c9ca94f0c Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Thu, 14 Sep 2023 21:55:22 +0200 Subject: [PATCH 22/32] refactor: use classvar and tuple for perf Signed-off-by: heitorlessa --- .../utilities/data_classes/kinesis_firehose_event.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py index 4f451c805c..814e908a4d 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py @@ -2,7 +2,7 @@ import json import warnings from dataclasses import dataclass, field -from typing import Any, Callable, Dict, Iterator, List, Optional +from typing import Any, Callable, ClassVar, Dict, Iterator, List, Optional, Tuple from typing_extensions import Literal @@ -62,6 +62,8 @@ class KinesisFirehoseDataTransformationRecord: - https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html """ + _valid_result_types: ClassVar[Tuple[str]] = ("Ok", "Dropped", "ProcessingFailed") + record_id: str result: Literal["Ok", "Dropped", "ProcessingFailed"] = "Ok" data: str = "" @@ -71,7 +73,7 @@ class KinesisFirehoseDataTransformationRecord: _json_data: Optional[Any] = None def asdict(self) -> Dict: - if self.result not in ["Ok", "Dropped", "ProcessingFailed"]: + if self.result not in self._valid_result_types: warnings.warn( stacklevel=1, message=f'The result "{self.result}" is not valid, Choose from "Ok", "Dropped", "ProcessingFailed"', From d05ca07f71c839a1421396a3029b938a02ddc327 Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Thu, 14 Sep 2023 22:01:13 +0200 Subject: [PATCH 23/32] chore: fix rebase issue Signed-off-by: heitorlessa --- .../utilities/data_classes/kinesis_firehose_event.py | 2 +- tests/unit/data_classes/test_kinesis_firehose_response.py | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py index 814e908a4d..144f5b8540 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py @@ -85,7 +85,7 @@ def asdict(self) -> Dict: "data": self.data, } if self.metadata: - record["metadata"] = self.metadata.__dict__ + record["metadata"] = self.metadata.asdict() return record @property diff --git a/tests/unit/data_classes/test_kinesis_firehose_response.py b/tests/unit/data_classes/test_kinesis_firehose_response.py index 4a0263837b..94f82e008c 100644 --- a/tests/unit/data_classes/test_kinesis_firehose_response.py +++ b/tests/unit/data_classes/test_kinesis_firehose_response.py @@ -53,24 +53,27 @@ def test_kinesis_firehose_response(): def test_kinesis_firehose_response_asdict(): - # Given the following example response provided by Firehose + # GIVEN the following example response provided by Firehose sample_response = { "records": [ {"recordId": "sample_record", "data": "", "result": "Ok", "metadata": {"partitionKeys": {"year": "2023"}}}, ], } - # Then asdict function should be able to return the same value response = KinesisFirehoseDataTransformationResponse() metadata_partition = KinesisFirehoseDataTransformationRecordMetadata( partition_keys=sample_response["records"][0]["metadata"]["partitionKeys"], ) + + # WHEN we create a transformation record with the exact same data processed_record = KinesisFirehoseDataTransformationRecord( record_id=sample_response["records"][0]["recordId"], data=sample_response["records"][0]["data"], result=sample_response["records"][0]["result"], metadata=metadata_partition, ) + + # THEN serialized response should return the same value response.add_record(record=processed_record) assert response.asdict() == sample_response From b20137ac44117b75e1f068345862220eaa8df21e Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Thu, 14 Sep 2023 22:02:33 +0200 Subject: [PATCH 24/32] chore: fix mypy tuple exactness type Signed-off-by: heitorlessa --- .../utilities/data_classes/kinesis_firehose_event.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py index 144f5b8540..4518409ad3 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py @@ -62,7 +62,7 @@ class KinesisFirehoseDataTransformationRecord: - https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html """ - _valid_result_types: ClassVar[Tuple[str]] = ("Ok", "Dropped", "ProcessingFailed") + _valid_result_types: ClassVar[Tuple[str, str, str]] = ("Ok", "Dropped", "ProcessingFailed") record_id: str result: Literal["Ok", "Dropped", "ProcessingFailed"] = "Ok" From ef12496ea40729efa71746bc1da14863b0ad85af Mon Sep 17 00:00:00 2001 From: RogerZhang Date: Thu, 14 Sep 2023 20:06:41 +0000 Subject: [PATCH 25/32] remove Ok in example response,add failure example --- docs/utilities/data_classes.md | 8 ++++ .../src/kinesis_firehose_delivery_stream.py | 1 - ...kinesis_firehose_delivery_stream_failed.py | 31 +++++++++++++++ .../src/kinesis_firehose_response.py | 1 - .../kinesis_firehose_response_exception.py | 39 +++++++++++++++++++ 5 files changed, 78 insertions(+), 2 deletions(-) create mode 100644 examples/event_sources/src/kinesis_firehose_delivery_stream_failed.py create mode 100644 examples/event_sources/src/kinesis_firehose_response_exception.py diff --git a/docs/utilities/data_classes.md b/docs/utilities/data_classes.md index e859e9da16..d3bc898e0f 100644 --- a/docs/utilities/data_classes.md +++ b/docs/utilities/data_classes.md @@ -989,6 +989,10 @@ in the example below. ```python --8<-- "examples/event_sources/src/kinesis_firehose_delivery_stream.py" ``` +=== "with Failure" + ```python hl_lines="25" + --8<-- "examples/event_sources/src/kinesis_firehose_delivery_stream_failed.py" + ``` You can also construct response without using `event_source` wrapper. Shown in the example below. @@ -997,6 +1001,10 @@ You can also construct response without using `event_source` wrapper. Shown in t ```python --8<-- "examples/event_sources/src/kinesis_firehose_response.py" ``` +=== "with Exception" + ```python hl_lines="26" + --8<-- "examples/event_sources/src/kinesis_firehose_response_exception.py" + ``` ### Lambda Function URL diff --git a/examples/event_sources/src/kinesis_firehose_delivery_stream.py b/examples/event_sources/src/kinesis_firehose_delivery_stream.py index ef60f4cabf..88d0a29983 100644 --- a/examples/event_sources/src/kinesis_firehose_delivery_stream.py +++ b/examples/event_sources/src/kinesis_firehose_delivery_stream.py @@ -20,7 +20,6 @@ def lambda_handler(event: KinesisFirehoseEvent, context: LambdaContext): transformed_data = {"new_data": "transformed data using Powertools", "original_payload": data} processed_record = record.build_data_transformation_response( - result="Ok", data=base64_from_json(transformed_data), ) diff --git a/examples/event_sources/src/kinesis_firehose_delivery_stream_failed.py b/examples/event_sources/src/kinesis_firehose_delivery_stream_failed.py new file mode 100644 index 0000000000..9e6dd936a3 --- /dev/null +++ b/examples/event_sources/src/kinesis_firehose_delivery_stream_failed.py @@ -0,0 +1,31 @@ +from aws_lambda_powertools.utilities.data_classes import ( + KinesisFirehoseDataTransformationResponse, + KinesisFirehoseEvent, + event_source, +) +from aws_lambda_powertools.utilities.serialization import base64_from_json +from aws_lambda_powertools.utilities.typing import LambdaContext + + +@event_source(data_class=KinesisFirehoseEvent) +def lambda_handler(event: KinesisFirehoseEvent, context: LambdaContext): + result = KinesisFirehoseDataTransformationResponse() + + for record in event.records: + # get original data using data_as_text property + data = record.data_as_text + + ## do all kind of stuff with data + ## generate data to return + transformed_data = {"new_data": "transformed data using Powertools", "original_payload": data} + + # some process failed, send back to kinesis + processed_record = record.build_data_transformation_response( + data=base64_from_json(transformed_data), + result="ProcessingFailed", + ) + + result.add_record(processed_record) + + # return transformed records + return result.asdict() diff --git a/examples/event_sources/src/kinesis_firehose_response.py b/examples/event_sources/src/kinesis_firehose_response.py index 093a6f378e..04c9b51717 100644 --- a/examples/event_sources/src/kinesis_firehose_response.py +++ b/examples/event_sources/src/kinesis_firehose_response.py @@ -20,7 +20,6 @@ def lambda_handler(event: dict, context: LambdaContext): processed_record = KinesisFirehoseDataTransformationRecord( record_id=record["recordId"], - result="Ok", data=base64_from_json(transformed_data), ) result.add_record(processed_record) diff --git a/examples/event_sources/src/kinesis_firehose_response_exception.py b/examples/event_sources/src/kinesis_firehose_response_exception.py new file mode 100644 index 0000000000..0e2e59454e --- /dev/null +++ b/examples/event_sources/src/kinesis_firehose_response_exception.py @@ -0,0 +1,39 @@ +import base64 + +from aws_lambda_powertools.utilities.data_classes import ( + KinesisFirehoseDataTransformationRecord, + KinesisFirehoseDataTransformationResponse, +) +from aws_lambda_powertools.utilities.serialization import base64_from_json +from aws_lambda_powertools.utilities.typing import LambdaContext + + +def lambda_handler(event: dict, context: LambdaContext): + result = KinesisFirehoseDataTransformationResponse() + + for record in event["records"]: + print(record["recordId"]) + try: + payload = base64.b64decode(record["data"]).decode("utf-8") + ## do all kind of stuff with payload + ## generate data to return + transformed_data = {"tool_used": "powertools_dataclass", "original_payload": payload} + except Exception: + # add Failed result to processing results and send back to kinesis + processed_record = KinesisFirehoseDataTransformationRecord( + record_id=record["recordId"], + data=base64_from_json(transformed_data), + result="ProcessingFailed", + ) + result.add_record(processed_record) + continue + + # Default result is Ok + processed_record = KinesisFirehoseDataTransformationRecord( + record_id=record["recordId"], + data=base64_from_json(transformed_data), + ) + result.add_record(processed_record) + + # return transformed records + return result.asdict() From 3e43a25151a9362210536e2835a4d75944209231 Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Thu, 14 Sep 2023 22:10:23 +0200 Subject: [PATCH 26/32] chore: clean up docs example Signed-off-by: heitorlessa --- .../event_sources/src/kinesis_firehose_response.py | 13 ++++++------- .../data_classes/test_kinesis_firehose_response.py | 1 - 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/examples/event_sources/src/kinesis_firehose_response.py b/examples/event_sources/src/kinesis_firehose_response.py index 04c9b51717..0336b65621 100644 --- a/examples/event_sources/src/kinesis_firehose_response.py +++ b/examples/event_sources/src/kinesis_firehose_response.py @@ -1,25 +1,24 @@ -import base64 - from aws_lambda_powertools.utilities.data_classes import ( KinesisFirehoseDataTransformationRecord, KinesisFirehoseDataTransformationResponse, + KinesisFirehoseEvent, ) from aws_lambda_powertools.utilities.serialization import base64_from_json from aws_lambda_powertools.utilities.typing import LambdaContext def lambda_handler(event: dict, context: LambdaContext): + firehose_event = KinesisFirehoseEvent(event) result = KinesisFirehoseDataTransformationResponse() - for record in event["records"]: - print(record["recordId"]) - payload = base64.b64decode(record["data"]).decode("utf-8") - ## do all kind of stuff with payload + for record in firehose_event.records: + payload = record.data_as_text # base64 decoded data as str + ## generate data to return transformed_data = {"tool_used": "powertools_dataclass", "original_payload": payload} processed_record = KinesisFirehoseDataTransformationRecord( - record_id=record["recordId"], + record_id=record.record_id, data=base64_from_json(transformed_data), ) result.add_record(processed_record) diff --git a/tests/unit/data_classes/test_kinesis_firehose_response.py b/tests/unit/data_classes/test_kinesis_firehose_response.py index 94f82e008c..0be8d0d3ec 100644 --- a/tests/unit/data_classes/test_kinesis_firehose_response.py +++ b/tests/unit/data_classes/test_kinesis_firehose_response.py @@ -92,7 +92,6 @@ def test_kinesis_firehose_create_response(): for record in parsed_event.records: metadata_partition = KinesisFirehoseDataTransformationRecordMetadata(partition_keys={"year": "2023"}) processed_record = record.build_data_transformation_response( - result="Ok", metadata=metadata_partition, data=base64_from_str(arbitrary_data), ) From 4ed0e66d7cb2b49ed27edce4b3a564d5f3d83ad1 Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Thu, 14 Sep 2023 22:30:33 +0200 Subject: [PATCH 27/32] chore: lower cognitive overhead; add example docstring Signed-off-by: heitorlessa --- .../data_classes/kinesis_firehose_event.py | 35 +++++++++++++++++++ .../src/kinesis_firehose_response.py | 2 +- 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py index 4518409ad3..dd42a09fa5 100644 --- a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py +++ b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py @@ -125,6 +125,41 @@ class KinesisFirehoseDataTransformationResponse: records : List[KinesisFirehoseResponseRecord] records of Kinesis Data Firehose response object, optional parameter at start. can be added later using `add_record` function. + + Examples + -------- + + **Transforming data records** + + ```python + from aws_lambda_powertools.utilities.data_classes import ( + KinesisFirehoseDataTransformationRecord, + KinesisFirehoseDataTransformationResponse, + KinesisFirehoseEvent, + ) + from aws_lambda_powertools.utilities.serialization import base64_from_json + from aws_lambda_powertools.utilities.typing import LambdaContext + + + def lambda_handler(event: dict, context: LambdaContext): + firehose_event = KinesisFirehoseEvent(event) + result = KinesisFirehoseDataTransformationResponse() + + for record in firehose_event.records: + payload = record.data_as_text # base64 decoded data as str + + ## generate data to return + transformed_data = {"tool_used": "powertools_dataclass", "original_payload": payload} + processed_record = KinesisFirehoseDataTransformationRecord( + record_id=record.record_id, + data=base64_from_json(transformed_data), + ) + + result.add_record(processed_record) + + # return transformed records + return result.asdict() + ``` """ records: List[KinesisFirehoseDataTransformationRecord] = field(default_factory=list) diff --git a/examples/event_sources/src/kinesis_firehose_response.py b/examples/event_sources/src/kinesis_firehose_response.py index 0336b65621..fc87d5cb74 100644 --- a/examples/event_sources/src/kinesis_firehose_response.py +++ b/examples/event_sources/src/kinesis_firehose_response.py @@ -16,11 +16,11 @@ def lambda_handler(event: dict, context: LambdaContext): ## generate data to return transformed_data = {"tool_used": "powertools_dataclass", "original_payload": payload} - processed_record = KinesisFirehoseDataTransformationRecord( record_id=record.record_id, data=base64_from_json(transformed_data), ) + result.add_record(processed_record) # return transformed records From d1fc1c5f6cf401194532f9d605ae1cf346766c20 Mon Sep 17 00:00:00 2001 From: RogerZhang Date: Thu, 14 Sep 2023 20:34:35 +0000 Subject: [PATCH 28/32] add drop example --- docs/utilities/data_classes.md | 12 +++---- ...kinesis_firehose_delivery_stream_failed.py | 31 ---------------- .../src/kinesis_firehose_response.py | 14 ++++---- .../src/kinesis_firehose_response_drop.py | 36 +++++++++++++++++++ .../kinesis_firehose_response_exception.py | 27 +++++++------- 5 files changed, 61 insertions(+), 59 deletions(-) delete mode 100644 examples/event_sources/src/kinesis_firehose_delivery_stream_failed.py create mode 100644 examples/event_sources/src/kinesis_firehose_response_drop.py diff --git a/docs/utilities/data_classes.md b/docs/utilities/data_classes.md index d3bc898e0f..2538d401c6 100644 --- a/docs/utilities/data_classes.md +++ b/docs/utilities/data_classes.md @@ -989,10 +989,6 @@ in the example below. ```python --8<-- "examples/event_sources/src/kinesis_firehose_delivery_stream.py" ``` -=== "with Failure" - ```python hl_lines="25" - --8<-- "examples/event_sources/src/kinesis_firehose_delivery_stream_failed.py" - ``` You can also construct response without using `event_source` wrapper. Shown in the example below. @@ -1001,10 +997,14 @@ You can also construct response without using `event_source` wrapper. Shown in t ```python --8<-- "examples/event_sources/src/kinesis_firehose_response.py" ``` -=== "with Exception" - ```python hl_lines="26" +=== "with Failure" + ```python hl_lines="30" --8<-- "examples/event_sources/src/kinesis_firehose_response_exception.py" ``` +=== "with Dropped" + ```python hl_lines="30" + --8<-- "examples/event_sources/src/kinesis_firehose_response_drop.py" + ``` ### Lambda Function URL diff --git a/examples/event_sources/src/kinesis_firehose_delivery_stream_failed.py b/examples/event_sources/src/kinesis_firehose_delivery_stream_failed.py deleted file mode 100644 index 9e6dd936a3..0000000000 --- a/examples/event_sources/src/kinesis_firehose_delivery_stream_failed.py +++ /dev/null @@ -1,31 +0,0 @@ -from aws_lambda_powertools.utilities.data_classes import ( - KinesisFirehoseDataTransformationResponse, - KinesisFirehoseEvent, - event_source, -) -from aws_lambda_powertools.utilities.serialization import base64_from_json -from aws_lambda_powertools.utilities.typing import LambdaContext - - -@event_source(data_class=KinesisFirehoseEvent) -def lambda_handler(event: KinesisFirehoseEvent, context: LambdaContext): - result = KinesisFirehoseDataTransformationResponse() - - for record in event.records: - # get original data using data_as_text property - data = record.data_as_text - - ## do all kind of stuff with data - ## generate data to return - transformed_data = {"new_data": "transformed data using Powertools", "original_payload": data} - - # some process failed, send back to kinesis - processed_record = record.build_data_transformation_response( - data=base64_from_json(transformed_data), - result="ProcessingFailed", - ) - - result.add_record(processed_record) - - # return transformed records - return result.asdict() diff --git a/examples/event_sources/src/kinesis_firehose_response.py b/examples/event_sources/src/kinesis_firehose_response.py index 04c9b51717..1b175338e9 100644 --- a/examples/event_sources/src/kinesis_firehose_response.py +++ b/examples/event_sources/src/kinesis_firehose_response.py @@ -1,27 +1,27 @@ -import base64 - from aws_lambda_powertools.utilities.data_classes import ( KinesisFirehoseDataTransformationRecord, KinesisFirehoseDataTransformationResponse, + KinesisFirehoseEvent, ) from aws_lambda_powertools.utilities.serialization import base64_from_json from aws_lambda_powertools.utilities.typing import LambdaContext def lambda_handler(event: dict, context: LambdaContext): + firehose_event = KinesisFirehoseEvent(event) result = KinesisFirehoseDataTransformationResponse() - for record in event["records"]: - print(record["recordId"]) - payload = base64.b64decode(record["data"]).decode("utf-8") - ## do all kind of stuff with payload + for record in firehose_event.records: + payload = record.data_as_text # base64 decoded data as str + ## generate data to return transformed_data = {"tool_used": "powertools_dataclass", "original_payload": payload} processed_record = KinesisFirehoseDataTransformationRecord( - record_id=record["recordId"], + record_id=record.record_id, data=base64_from_json(transformed_data), ) + result.add_record(processed_record) # return transformed records diff --git a/examples/event_sources/src/kinesis_firehose_response_drop.py b/examples/event_sources/src/kinesis_firehose_response_drop.py new file mode 100644 index 0000000000..4d6ec12a6c --- /dev/null +++ b/examples/event_sources/src/kinesis_firehose_response_drop.py @@ -0,0 +1,36 @@ +from aws_lambda_powertools.utilities.data_classes import ( + KinesisFirehoseDataTransformationRecord, + KinesisFirehoseDataTransformationResponse, + KinesisFirehoseEvent, +) +from aws_lambda_powertools.utilities.serialization import base64_from_json +from aws_lambda_powertools.utilities.typing import LambdaContext + + +def lambda_handler(event: dict, context: LambdaContext): + firehose_event = KinesisFirehoseEvent(event) + result = KinesisFirehoseDataTransformationResponse() + + for record in firehose_event.records: + try: + payload = record.data_as_text # base64 decoded data as str + ## do all kind of stuff with payload + ## generate data to return + transformed_data = {"tool_used": "powertools_dataclass", "original_payload": payload} + # Default result is Ok + processed_record = KinesisFirehoseDataTransformationRecord( + record_id=record.record_id, + data=base64_from_json(transformed_data), + ) + except Exception: + # encountered failure that couldn't be fixed by retry + processed_record = KinesisFirehoseDataTransformationRecord( + record_id=record.record_id, + data=record.data, + result="Dropped", + ) + + result.add_record(processed_record) + + # return transformed records + return result.asdict() diff --git a/examples/event_sources/src/kinesis_firehose_response_exception.py b/examples/event_sources/src/kinesis_firehose_response_exception.py index 0e2e59454e..5695c9aceb 100644 --- a/examples/event_sources/src/kinesis_firehose_response_exception.py +++ b/examples/event_sources/src/kinesis_firehose_response_exception.py @@ -1,38 +1,35 @@ -import base64 - from aws_lambda_powertools.utilities.data_classes import ( KinesisFirehoseDataTransformationRecord, KinesisFirehoseDataTransformationResponse, + KinesisFirehoseEvent, ) from aws_lambda_powertools.utilities.serialization import base64_from_json from aws_lambda_powertools.utilities.typing import LambdaContext def lambda_handler(event: dict, context: LambdaContext): + firehose_event = KinesisFirehoseEvent(event) result = KinesisFirehoseDataTransformationResponse() - for record in event["records"]: - print(record["recordId"]) + for record in firehose_event.records: try: - payload = base64.b64decode(record["data"]).decode("utf-8") + payload = record.data_as_text # base64 decoded data as str ## do all kind of stuff with payload ## generate data to return transformed_data = {"tool_used": "powertools_dataclass", "original_payload": payload} - except Exception: - # add Failed result to processing results and send back to kinesis + # Default result is Ok processed_record = KinesisFirehoseDataTransformationRecord( - record_id=record["recordId"], + record_id=record.record_id, data=base64_from_json(transformed_data), + ) + except Exception: + # add Failed result to processing results, send back to kinesis for retry + processed_record = KinesisFirehoseDataTransformationRecord( + record_id=record.record_id, + data=record.data, result="ProcessingFailed", ) - result.add_record(processed_record) - continue - # Default result is Ok - processed_record = KinesisFirehoseDataTransformationRecord( - record_id=record["recordId"], - data=base64_from_json(transformed_data), - ) result.add_record(processed_record) # return transformed records From c039480d8a2146bbb4c5989944d98a6936e33f72 Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Thu, 14 Sep 2023 22:49:42 +0200 Subject: [PATCH 29/32] docs: give info upfront, name examples --- docs/utilities/data_classes.md | 29 ++++++++----------- .../src/kinesis_firehose_response.py | 27 ----------------- 2 files changed, 12 insertions(+), 44 deletions(-) delete mode 100644 examples/event_sources/src/kinesis_firehose_response.py diff --git a/docs/utilities/data_classes.md b/docs/utilities/data_classes.md index 2538d401c6..6309210371 100644 --- a/docs/utilities/data_classes.md +++ b/docs/utilities/data_classes.md @@ -975,36 +975,31 @@ or plain text, depending on the original payload. ### Kinesis Firehose delivery stream -Kinesis Firehose Data Transformation can use a Lambda Function to modify the records -inline, and re-emit them back to the Delivery Stream. +When using Kinesis Firehose, you can use a Lambda function to [perform data transformation](https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html){target="_blank"}. For each transformed record, you can choose to either: -Similar to Kinesis Data Streams, the events contain base64 encoded data. You can use the helper -function to access the data either as json or plain text, depending on the original payload. +* **A)** Put them back to the delivery stream (default) +* **B)** Drop them so consumers don't receive them (e.g., data validation) +* **C)** Indicate a record failed data transformation and should be retried -When constructing response to Firehose, You can utilize the `KinesisFirehoseDataTransformationResponse` class shown -in the example below. +To do that, you can use `KinesisFirehoseDataTransformationResponse` class along with helper functions to make it easier to decode and encode base64 data in the stream. -=== "app.py" +=== "Transforming streaming records" ```python --8<-- "examples/event_sources/src/kinesis_firehose_delivery_stream.py" ``` -You can also construct response without using `event_source` wrapper. Shown in the example below. - -=== "app.py" +=== "Dropping invalid records" ```python - --8<-- "examples/event_sources/src/kinesis_firehose_response.py" + --8<-- "examples/event_sources/src/kinesis_firehose_response_drop.py" ``` -=== "with Failure" - ```python hl_lines="30" + +=== "Indicating a processing failure" + + ```python --8<-- "examples/event_sources/src/kinesis_firehose_response_exception.py" ``` -=== "with Dropped" - ```python hl_lines="30" - --8<-- "examples/event_sources/src/kinesis_firehose_response_drop.py" - ``` ### Lambda Function URL diff --git a/examples/event_sources/src/kinesis_firehose_response.py b/examples/event_sources/src/kinesis_firehose_response.py deleted file mode 100644 index fc87d5cb74..0000000000 --- a/examples/event_sources/src/kinesis_firehose_response.py +++ /dev/null @@ -1,27 +0,0 @@ -from aws_lambda_powertools.utilities.data_classes import ( - KinesisFirehoseDataTransformationRecord, - KinesisFirehoseDataTransformationResponse, - KinesisFirehoseEvent, -) -from aws_lambda_powertools.utilities.serialization import base64_from_json -from aws_lambda_powertools.utilities.typing import LambdaContext - - -def lambda_handler(event: dict, context: LambdaContext): - firehose_event = KinesisFirehoseEvent(event) - result = KinesisFirehoseDataTransformationResponse() - - for record in firehose_event.records: - payload = record.data_as_text # base64 decoded data as str - - ## generate data to return - transformed_data = {"tool_used": "powertools_dataclass", "original_payload": payload} - processed_record = KinesisFirehoseDataTransformationRecord( - record_id=record.record_id, - data=base64_from_json(transformed_data), - ) - - result.add_record(processed_record) - - # return transformed records - return result.asdict() From e11c71856b21c6c564ba4ac51c221b7118ce4775 Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Thu, 14 Sep 2023 22:57:08 +0200 Subject: [PATCH 30/32] docs: improve transforming records example --- docs/utilities/data_classes.md | 5 ++++- .../event_sources/src/kinesis_firehose_delivery_stream.py | 5 ++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/docs/utilities/data_classes.md b/docs/utilities/data_classes.md index 6309210371..3977b6f8b3 100644 --- a/docs/utilities/data_classes.md +++ b/docs/utilities/data_classes.md @@ -985,10 +985,13 @@ To do that, you can use `KinesisFirehoseDataTransformationResponse` class along === "Transforming streaming records" - ```python + ```python hl_lines="2-3 12 28" --8<-- "examples/event_sources/src/kinesis_firehose_delivery_stream.py" ``` + 1. **Ingesting JSON payloads?**

Use `record.data_as_json` to easily deserialize them. + 2. For your convenience, `base64_from_json` serializes a dict to JSON, then encode as base64 data. + === "Dropping invalid records" ```python diff --git a/examples/event_sources/src/kinesis_firehose_delivery_stream.py b/examples/event_sources/src/kinesis_firehose_delivery_stream.py index 88d0a29983..3dc6fbda70 100644 --- a/examples/event_sources/src/kinesis_firehose_delivery_stream.py +++ b/examples/event_sources/src/kinesis_firehose_delivery_stream.py @@ -13,14 +13,13 @@ def lambda_handler(event: KinesisFirehoseEvent, context: LambdaContext): for record in event.records: # get original data using data_as_text property - data = record.data_as_text + data = record.data_as_text # (1)! - ## do all kind of stuff with data ## generate data to return transformed_data = {"new_data": "transformed data using Powertools", "original_payload": data} processed_record = record.build_data_transformation_response( - data=base64_from_json(transformed_data), + data=base64_from_json(transformed_data), # (2)! ) result.add_record(processed_record) From 745492cceaf1ae3d759535720633c192468a1f0b Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Thu, 14 Sep 2023 23:06:45 +0200 Subject: [PATCH 31/32] docs: improve dropping records example --- docs/utilities/data_classes.md | 4 +++- .../src/kinesis_firehose_response_drop.py | 20 +++++++++++-------- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/docs/utilities/data_classes.md b/docs/utilities/data_classes.md index 3977b6f8b3..386d9808af 100644 --- a/docs/utilities/data_classes.md +++ b/docs/utilities/data_classes.md @@ -994,10 +994,12 @@ To do that, you can use `KinesisFirehoseDataTransformationResponse` class along === "Dropping invalid records" - ```python + ```python hl_lines="5-6 16 34" --8<-- "examples/event_sources/src/kinesis_firehose_response_drop.py" ``` + 1. This exception would be generated from `record.data_as_json` if invalid payload. + === "Indicating a processing failure" ```python diff --git a/examples/event_sources/src/kinesis_firehose_response_drop.py b/examples/event_sources/src/kinesis_firehose_response_drop.py index 4d6ec12a6c..8b565480a3 100644 --- a/examples/event_sources/src/kinesis_firehose_response_drop.py +++ b/examples/event_sources/src/kinesis_firehose_response_drop.py @@ -1,29 +1,33 @@ +from json import JSONDecodeError +from typing import Dict + from aws_lambda_powertools.utilities.data_classes import ( KinesisFirehoseDataTransformationRecord, KinesisFirehoseDataTransformationResponse, KinesisFirehoseEvent, + event_source, ) from aws_lambda_powertools.utilities.serialization import base64_from_json from aws_lambda_powertools.utilities.typing import LambdaContext -def lambda_handler(event: dict, context: LambdaContext): - firehose_event = KinesisFirehoseEvent(event) +@event_source(data_class=KinesisFirehoseEvent) +def lambda_handler(event: KinesisFirehoseEvent, context: LambdaContext): result = KinesisFirehoseDataTransformationResponse() - for record in firehose_event.records: + for record in event.records: try: - payload = record.data_as_text # base64 decoded data as str - ## do all kind of stuff with payload + payload: Dict = record.data_as_json # decodes and deserialize base64 JSON string + ## generate data to return transformed_data = {"tool_used": "powertools_dataclass", "original_payload": payload} - # Default result is Ok + processed_record = KinesisFirehoseDataTransformationRecord( record_id=record.record_id, data=base64_from_json(transformed_data), ) - except Exception: - # encountered failure that couldn't be fixed by retry + except JSONDecodeError: # (1)! + # our producers ingest JSON payloads only; drop malformed records from the stream processed_record = KinesisFirehoseDataTransformationRecord( record_id=record.record_id, data=record.data, From 61ddee9f984a722c68eb3e843e56440898b1771f Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Thu, 14 Sep 2023 23:14:33 +0200 Subject: [PATCH 32/32] docs: improve exception example --- docs/utilities/data_classes.md | 4 +++- .../src/kinesis_firehose_response_exception.py | 9 ++++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/docs/utilities/data_classes.md b/docs/utilities/data_classes.md index 386d9808af..6016e68cfe 100644 --- a/docs/utilities/data_classes.md +++ b/docs/utilities/data_classes.md @@ -1002,10 +1002,12 @@ To do that, you can use `KinesisFirehoseDataTransformationResponse` class along === "Indicating a processing failure" - ```python + ```python hl_lines="2-3 33" --8<-- "examples/event_sources/src/kinesis_firehose_response_exception.py" ``` + 1. This record will now be sent to your [S3 bucket in the `processing-failed` folder](https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html#data-transformation-failure-handling){target="_blank"}. + ### Lambda Function URL === "app.py" diff --git a/examples/event_sources/src/kinesis_firehose_response_exception.py b/examples/event_sources/src/kinesis_firehose_response_exception.py index 5695c9aceb..43ba3a039b 100644 --- a/examples/event_sources/src/kinesis_firehose_response_exception.py +++ b/examples/event_sources/src/kinesis_firehose_response_exception.py @@ -2,11 +2,13 @@ KinesisFirehoseDataTransformationRecord, KinesisFirehoseDataTransformationResponse, KinesisFirehoseEvent, + event_source, ) from aws_lambda_powertools.utilities.serialization import base64_from_json from aws_lambda_powertools.utilities.typing import LambdaContext +@event_source(data_class=KinesisFirehoseEvent) def lambda_handler(event: dict, context: LambdaContext): firehose_event = KinesisFirehoseEvent(event) result = KinesisFirehoseDataTransformationResponse() @@ -14,9 +16,10 @@ def lambda_handler(event: dict, context: LambdaContext): for record in firehose_event.records: try: payload = record.data_as_text # base64 decoded data as str - ## do all kind of stuff with payload - ## generate data to return + + # generate data to return transformed_data = {"tool_used": "powertools_dataclass", "original_payload": payload} + # Default result is Ok processed_record = KinesisFirehoseDataTransformationRecord( record_id=record.record_id, @@ -27,7 +30,7 @@ def lambda_handler(event: dict, context: LambdaContext): processed_record = KinesisFirehoseDataTransformationRecord( record_id=record.record_id, data=record.data, - result="ProcessingFailed", + result="ProcessingFailed", # (1)! ) result.add_record(processed_record)