Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(event_source): add Kinesis Firehose Data Transformation data class #3029

Merged
merged 52 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
040358d
support kinesis response
roger-zhangg Aug 30, 2023
6eb8530
Merge branch 'develop' into kinesis
leandrodamascena Aug 30, 2023
7ca016e
fix lint, address Leandro suggestions
roger-zhangg Aug 30, 2023
785a144
Merge branch 'develop' of https://github.com/aws-powertools/powertool…
roger-zhangg Aug 30, 2023
708686f
Merge branch 'kinesis' of https://github.com/roger-zhangg/aws-lambda-…
roger-zhangg Aug 30, 2023
8c9db32
remove deleted const
roger-zhangg Aug 30, 2023
f02319f
fix Literal import in 3.7
roger-zhangg Aug 30, 2023
5f55aa7
change to use data-classes
roger-zhangg Sep 1, 2023
2566f62
fix mypy
roger-zhangg Sep 1, 2023
636e9d1
fix typo, make asdict a function
roger-zhangg Sep 1, 2023
d3114c4
Merge branch 'develop' into kinesis
leandrodamascena Sep 4, 2023
3fa7b2d
Merge branch 'develop' into kinesis
leandrodamascena Sep 5, 2023
370e156
address Troy/Leandro suggestions
roger-zhangg Sep 5, 2023
99837fc
Merge branch 'kinesis' of https://github.com/roger-zhangg/aws-lambda-…
roger-zhangg Sep 5, 2023
029a55c
Merge branch 'develop' into kinesis
leandrodamascena Sep 6, 2023
af1abfe
remove 6MB comment
roger-zhangg Sep 6, 2023
2032146
Merge branch 'kinesis' of https://github.com/roger-zhangg/aws-lambda-…
roger-zhangg Sep 6, 2023
4168e21
Merge branch 'develop' into kinesis
leandrodamascena Sep 7, 2023
312830b
fix comments
roger-zhangg Sep 7, 2023
a6c05a5
Merge branch 'kinesis' of https://github.com/roger-zhangg/aws-lambda-…
roger-zhangg Sep 7, 2023
a3ed9f9
address Heitor's suggestion
roger-zhangg Sep 7, 2023
bfbee60
data class default optimization
roger-zhangg Sep 7, 2023
4016446
remove slot for static check
roger-zhangg Sep 7, 2023
5dbb3ff
fix doc, example
roger-zhangg Sep 7, 2023
5b8a9c6
Merge branch 'develop' into kinesis
leandrodamascena Sep 7, 2023
95b3958
Merge branch 'develop' into kinesis
leandrodamascena Sep 11, 2023
e4d75d7
rename r->record
roger-zhangg Sep 11, 2023
f5ca27d
Merge branch 'kinesis' of github.com:roger-zhangg/aws-lambda-powertoo…
roger-zhangg Sep 11, 2023
46fbe98
Merge branch 'develop' into kinesis
leandrodamascena Sep 11, 2023
ddd49d2
Merge branch 'develop' into kinesis
leandrodamascena Sep 12, 2023
ce6ed61
Addressing Heitor's feedback
leandrodamascena Sep 12, 2023
d8be53e
Addressing Heitor's feedback
leandrodamascena Sep 12, 2023
3a11563
Addressing Heitor's feedback
leandrodamascena Sep 12, 2023
17c6763
add result warning, add asdict test, metadata test
roger-zhangg Sep 12, 2023
00051a8
Merge branch 'develop' into kinesis
roger-zhangg Sep 13, 2023
455402a
Merge branch 'develop' into kinesis
leandrodamascena Sep 14, 2023
cad6c09
refactor: initial refactoring
heitorlessa Sep 12, 2023
6809e0e
chore: branding
heitorlessa Sep 14, 2023
42a0de9
refactor: use classvar and tuple for perf
heitorlessa Sep 14, 2023
d05ca07
chore: fix rebase issue
heitorlessa Sep 14, 2023
b20137a
chore: fix mypy tuple exactness type
heitorlessa Sep 14, 2023
ef12496
remove Ok in example response,add failure example
roger-zhangg Sep 14, 2023
f2d5e63
Merge branch 'kinesis' of github.com:roger-zhangg/aws-lambda-powertoo…
roger-zhangg Sep 14, 2023
3e43a25
chore: clean up docs example
heitorlessa Sep 14, 2023
4ed0e66
chore: lower cognitive overhead; add example docstring
heitorlessa Sep 14, 2023
d1fc1c5
add drop example
roger-zhangg Sep 14, 2023
eb39c03
Merge branch 'kinesis' of github.com:roger-zhangg/aws-lambda-powertoo…
roger-zhangg Sep 14, 2023
c039480
docs: give info upfront, name examples
heitorlessa Sep 14, 2023
e11c718
docs: improve transforming records example
heitorlessa Sep 14, 2023
745492c
docs: improve dropping records example
heitorlessa Sep 14, 2023
61ddee9
docs: improve exception example
heitorlessa Sep 14, 2023
6f95e8f
Merge branch 'develop' into kinesis
leandrodamascena Sep 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion aws_lambda_powertools/utilities/data_classes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@
from .event_bridge_event import EventBridgeEvent
from .event_source import event_source
from .kafka_event import KafkaEvent
from .kinesis_firehose_event import KinesisFirehoseEvent
from .kinesis_firehose_event import (
KinesisFirehoseDataTransformationRecord,
KinesisFirehoseDataTransformationRecordMetadata,
KinesisFirehoseDataTransformationResponse,
KinesisFirehoseEvent,
)
from .kinesis_stream_event import KinesisStreamEvent
from .lambda_function_url_event import LambdaFunctionUrlEvent
from .s3_event import S3Event, S3EventBridgeNotificationEvent
Expand All @@ -37,6 +42,9 @@
"KafkaEvent",
"KinesisFirehoseEvent",
"KinesisStreamEvent",
"KinesisFirehoseDataTransformationResponse",
"KinesisFirehoseDataTransformationRecord",
"KinesisFirehoseDataTransformationRecordMetadata",
"LambdaFunctionUrlEvent",
"S3Event",
"S3EventBridgeNotificationEvent",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,125 @@
import base64
from typing import Iterator, Optional
import json
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, Iterator, List, Optional

from typing_extensions import Literal

from aws_lambda_powertools.utilities.data_classes.common import DictWrapper


@dataclass(repr=False, order=False, slots=True, frozen=True)
class KinesisFirehoseDataTransformationRecordMetadata:
"""
Documentation:
roger-zhangg marked this conversation as resolved.
Show resolved Hide resolved
--------------
- https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
"""

partition_keys: Optional[Dict[str, str]]
leandrodamascena marked this conversation as resolved.
Show resolved Hide resolved

def asdict(self) -> Dict:
if self.partition_keys is not None:
return {"partitionKeys": self.partition_keys}
return {}
leandrodamascena marked this conversation as resolved.
Show resolved Hide resolved


@dataclass(repr=False, order=False, slots=True)
class KinesisFirehoseDataTransformationRecord:
"""Record in Kinesis Data Firehose response object

Documentation:
--------------
- https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html
"""

# Record ID; uniquely identifies this record within the current batch"""
record_id: str
# Processing result, supported value: Ok, Dropped, ProcessingFailed"""
result: Literal["Ok", "Dropped", "ProcessingFailed"] = "Ok"
# data blob, base64-encoded, optional at init. Allows pass in base64-encoded data directly or
# use either function like `data_from_text`, `data_from_json` to populate data"""
data: Optional[str] = None
# Optional: Metadata associated with this record; can contain partition keys
# See - https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
metadata: Optional[KinesisFirehoseDataTransformationRecordMetadata] = None
_json_data: Optional[Any] = None
json_serializer: Callable = json.dumps
json_deserializer: Callable = json.loads
roger-zhangg marked this conversation as resolved.
Show resolved Hide resolved

def data_from_byte(self, data: bytes):
"""Populate data field using a byte like data"""
self.data = base64.b64encode(data).decode("utf-8")

def data_from_text(self, data: str):
"""Populate data field using a string like data"""
self.data_from_byte(data.encode("utf-8"))

def data_from_json(self, data: Any):
"""Populate data field using any structure that could be converted to json"""
self.data_from_text(data=self.json_serializer(data))
leandrodamascena marked this conversation as resolved.
Show resolved Hide resolved

def asdict(self) -> Dict:
r: Dict[str, Any] = {
"recordId": self.record_id,
"result": self.result,
"data": self.data,
}
if self.metadata:
r["metadata"] = self.metadata.asdict()
return r
roger-zhangg marked this conversation as resolved.
Show resolved Hide resolved

@property
def data_as_bytes(self) -> bytes:
"""Decoded base64-encoded data as bytes"""
if not self.data:
return b""
return base64.b64decode(self.data)

@property
def data_as_text(self) -> str:
"""Decoded base64-encoded data as text"""
if not self.data:
return ""
return self.data_as_bytes.decode("utf-8")

@property
def data_as_json(self) -> Dict:
"""Decoded base64-encoded data loaded to json"""
if not self.data:
return {}
if self._json_data is None:
self._json_data = self.json_deserializer(self.data_as_text)
return self._json_data


@dataclass(repr=False, order=False, slots=True)
class KinesisFirehoseDataTransformationResponse:
"""Kinesis Data Firehose response object

Documentation:
--------------
- https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html

Parameters
----------
records : List[KinesisFirehoseResponseRecord]
records of Kinesis Data Firehose response object,
optional parameter at start. can be added later using `add_record` function.
"""

records: List[KinesisFirehoseDataTransformationRecord] = field(default_factory=list)

def add_record(self, record: KinesisFirehoseDataTransformationRecord):
self.records.append(record)

def asdict(self) -> Dict:
if not self.records:
raise ValueError("Kinesis Firehose doesn't accept empty response")

return {"records": [r.asdict() for r in self.records]}
roger-zhangg marked this conversation as resolved.
Show resolved Hide resolved


class KinesisFirehoseRecordMetadata(DictWrapper):
@property
def _metadata(self) -> dict:
Expand Down Expand Up @@ -77,6 +193,31 @@ def data_as_json(self) -> dict:
self._json_data = self._json_deserializer(self.data_as_text)
return self._json_data

def build_data_transformation_response(
self,
result: Literal["Ok", "Dropped", "ProcessingFailed"] = "Ok",
data: Optional[str] = None,
metadata: Optional[KinesisFirehoseDataTransformationRecordMetadata] = None,
) -> KinesisFirehoseDataTransformationRecord:
"""create a KinesisFirehoseResponseRecord directly using the record_id and given values
Parameters
----------
result : Literal["Ok", "Dropped", "ProcessingFailed"]
processing result, supported value: Ok, Dropped, ProcessingFailed
data : str, optional
data blob, base64-encoded, optional at init. Allows pass in base64-encoded data directly or
use either function like `data_from_text`, `data_from_json` to populate data
metadata: KinesisFirehoseResponseRecordMetadata, optional
Metadata associated with this record; can contain partition keys
- https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
"""
return KinesisFirehoseDataTransformationRecord(
record_id=self.record_id,
result=result,
data=data,
metadata=metadata,
)


class KinesisFirehoseEvent(DictWrapper):
"""Kinesis Data Firehose event
Expand Down
11 changes: 11 additions & 0 deletions docs/utilities/data_classes.md
Original file line number Diff line number Diff line change
Expand Up @@ -981,12 +981,23 @@ inline, and re-emit them back to the Delivery Stream.
Similar to Kinesis Data Streams, the events contain base64 encoded data. You can use the helper
function to access the data either as json or plain text, depending on the original payload.

When constructing response to Firehose, You can utilize the `KinesisFirehoseResponse` class shown
in the example below.
roger-zhangg marked this conversation as resolved.
Show resolved Hide resolved

=== "app.py"

```python
--8<-- "examples/event_sources/src/kinesis_firehose_delivery_stream.py"
```

You can also construct response without using `event_source` wrapper. Shown in the example below.

=== "app.py"

```python
--8<-- "examples/event_sources/src/kinesis_firehose_response.py"
```

### Lambda Function URL

=== "app.py"
Expand Down
21 changes: 10 additions & 11 deletions examples/event_sources/src/kinesis_firehose_delivery_stream.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import base64
import json

from aws_lambda_powertools.utilities.data_classes import (
KinesisFirehoseDataTransformationResponse,
KinesisFirehoseEvent,
event_source,
)
Expand All @@ -10,19 +8,20 @@

@event_source(data_class=KinesisFirehoseEvent)
def lambda_handler(event: KinesisFirehoseEvent, context: LambdaContext):
result = []
result = KinesisFirehoseDataTransformationResponse()

for record in event.records:
# if data was delivered as json; caches loaded value
data = record.data_as_json

processed_record = {
"recordId": record.record_id,
"data": base64.b64encode(json.dumps(data).encode("utf-8")),
"result": "Ok",
}
## do all kind of stuff with data
## generate data to return
new_data = {"tool_used": "powertools_dataclass", "original_payload": data}

processed_record = record.build_data_transformation_response(result="Ok")
processed_record.data_from_json(data=new_data)

leandrodamascena marked this conversation as resolved.
Show resolved Hide resolved
result.append(processed_record)
result.add_record(processed_record)

# return transformed records
return {"records": result}
return result.asdict()
24 changes: 24 additions & 0 deletions examples/event_sources/src/kinesis_firehose_response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import base64

from aws_lambda_powertools.utilities.data_classes import (
KinesisFirehoseDataTransformationRecord,
KinesisFirehoseDataTransformationResponse,
)


def lambda_handler(event, context):
result = KinesisFirehoseDataTransformationResponse()

for record in event["records"]:
print(record["recordId"])
payload = base64.b64decode(record["data"]).decode("utf-8")
## do all kind of stuff with payload
## generate data to return
new_data = {"tool_used": "powertools_dataclass", "original_payload": payload}

processed_record = KinesisFirehoseDataTransformationRecord(record_id=record["recordId"], result="Ok")
processed_record.data_from_text(data=new_data)
roger-zhangg marked this conversation as resolved.
Show resolved Hide resolved
result.add_record(processed_record)

# return transformed records
return result.asdict()
69 changes: 69 additions & 0 deletions tests/unit/data_classes/test_kinesis_firehose_response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from aws_lambda_powertools.utilities.data_classes import (
KinesisFirehoseDataTransformationRecord,
KinesisFirehoseDataTransformationRecordMetadata,
KinesisFirehoseDataTransformationResponse,
KinesisFirehoseEvent,
)
from tests.functional.utils import load_event


def test_kinesis_firehose_response():
leandrodamascena marked this conversation as resolved.
Show resolved Hide resolved
raw_event = load_event("kinesisFirehoseKinesisEvent.json")
parsed_event = KinesisFirehoseEvent(data=raw_event)

response = KinesisFirehoseDataTransformationResponse()
for record in parsed_event.records:
# if data was delivered as json; caches loaded value
data = record.data_as_text

metadata_partition = KinesisFirehoseDataTransformationRecordMetadata(partition_keys={"year": 2023})
processed_record = KinesisFirehoseDataTransformationRecord(
record_id=record.record_id,
result="Ok",
metadata=metadata_partition,
)
processed_record.data_from_text(data=data)
response.add_record(record=processed_record)
response_dict = response.asdict()

res_records = list(response_dict["records"])
assert len(res_records) == 2
record_01, record_02 = res_records[:]
record01_raw = raw_event["records"][0]
assert record_01["result"] == "Ok"
assert record_01["recordId"] == record01_raw["recordId"]
assert record_01["data"] == record01_raw["data"]
assert record_01["metadata"]["partitionKeys"]["year"] == 2023

assert response.records[0].data_as_bytes == b"Hello World"
assert response.records[0].data_as_text == "Hello World"


def test_kinesis_firehose_create_response():
raw_event = load_event("kinesisFirehoseKinesisEvent.json")
parsed_event = KinesisFirehoseEvent(data=raw_event)

response = KinesisFirehoseDataTransformationResponse()
for record in parsed_event.records:
# if data was delivered as json; caches loaded value
data = record.data_as_text
metadata_partition = KinesisFirehoseDataTransformationRecordMetadata(partition_keys={"year": 2023})
processed_record = record.build_data_transformation_response(
result="Ok",
metadata=metadata_partition,
)
processed_record.data_from_text(data=data)
response.add_record(record=processed_record)
response_dict = response.asdict()

res_records = list(response_dict["records"])
assert len(res_records) == 2
record_01, record_02 = res_records[:]
record01_raw = raw_event["records"][0]
assert record_01["result"] == "Ok"
assert record_01["recordId"] == record01_raw["recordId"]
assert record_01["data"] == record01_raw["data"]
assert record_01["metadata"]["partitionKeys"]["year"] == 2023

assert response.records[0].data_as_bytes == b"Hello World"
assert response.records[0].data_as_text == "Hello World"
Loading