Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(event_source): add Kinesis Firehose Data Transformation data class #3029

Merged
merged 52 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
040358d
support kinesis response
roger-zhangg Aug 30, 2023
6eb8530
Merge branch 'develop' into kinesis
leandrodamascena Aug 30, 2023
7ca016e
fix lint, address Leandro suggestions
roger-zhangg Aug 30, 2023
785a144
Merge branch 'develop' of https://github.com/aws-powertools/powertool…
roger-zhangg Aug 30, 2023
708686f
Merge branch 'kinesis' of https://github.com/roger-zhangg/aws-lambda-…
roger-zhangg Aug 30, 2023
8c9db32
remove deleted const
roger-zhangg Aug 30, 2023
f02319f
fix Literal import in 3.7
roger-zhangg Aug 30, 2023
5f55aa7
change to use data-classes
roger-zhangg Sep 1, 2023
2566f62
fix mypy
roger-zhangg Sep 1, 2023
636e9d1
fix typo, make asdict a function
roger-zhangg Sep 1, 2023
d3114c4
Merge branch 'develop' into kinesis
leandrodamascena Sep 4, 2023
3fa7b2d
Merge branch 'develop' into kinesis
leandrodamascena Sep 5, 2023
370e156
address Troy/Leandro suggestions
roger-zhangg Sep 5, 2023
99837fc
Merge branch 'kinesis' of https://github.com/roger-zhangg/aws-lambda-…
roger-zhangg Sep 5, 2023
029a55c
Merge branch 'develop' into kinesis
leandrodamascena Sep 6, 2023
af1abfe
remove 6MB comment
roger-zhangg Sep 6, 2023
2032146
Merge branch 'kinesis' of https://github.com/roger-zhangg/aws-lambda-…
roger-zhangg Sep 6, 2023
4168e21
Merge branch 'develop' into kinesis
leandrodamascena Sep 7, 2023
312830b
fix comments
roger-zhangg Sep 7, 2023
a6c05a5
Merge branch 'kinesis' of https://github.com/roger-zhangg/aws-lambda-…
roger-zhangg Sep 7, 2023
a3ed9f9
address Heitor's suggestion
roger-zhangg Sep 7, 2023
bfbee60
data class default optimization
roger-zhangg Sep 7, 2023
4016446
remove slot for static check
roger-zhangg Sep 7, 2023
5dbb3ff
fix doc, example
roger-zhangg Sep 7, 2023
5b8a9c6
Merge branch 'develop' into kinesis
leandrodamascena Sep 7, 2023
95b3958
Merge branch 'develop' into kinesis
leandrodamascena Sep 11, 2023
e4d75d7
rename r->record
roger-zhangg Sep 11, 2023
f5ca27d
Merge branch 'kinesis' of github.com:roger-zhangg/aws-lambda-powertoo…
roger-zhangg Sep 11, 2023
46fbe98
Merge branch 'develop' into kinesis
leandrodamascena Sep 11, 2023
ddd49d2
Merge branch 'develop' into kinesis
leandrodamascena Sep 12, 2023
ce6ed61
Addressing Heitor's feedback
leandrodamascena Sep 12, 2023
d8be53e
Addressing Heitor's feedback
leandrodamascena Sep 12, 2023
3a11563
Addressing Heitor's feedback
leandrodamascena Sep 12, 2023
17c6763
add result warning, add asdict test, metadata test
roger-zhangg Sep 12, 2023
00051a8
Merge branch 'develop' into kinesis
roger-zhangg Sep 13, 2023
455402a
Merge branch 'develop' into kinesis
leandrodamascena Sep 14, 2023
cad6c09
refactor: initial refactoring
heitorlessa Sep 12, 2023
6809e0e
chore: branding
heitorlessa Sep 14, 2023
42a0de9
refactor: use classvar and tuple for perf
heitorlessa Sep 14, 2023
d05ca07
chore: fix rebase issue
heitorlessa Sep 14, 2023
b20137a
chore: fix mypy tuple exactness type
heitorlessa Sep 14, 2023
ef12496
remove Ok in example response,add failure example
roger-zhangg Sep 14, 2023
f2d5e63
Merge branch 'kinesis' of github.com:roger-zhangg/aws-lambda-powertoo…
roger-zhangg Sep 14, 2023
3e43a25
chore: clean up docs example
heitorlessa Sep 14, 2023
4ed0e66
chore: lower cognitive overhead; add example docstring
heitorlessa Sep 14, 2023
d1fc1c5
add drop example
roger-zhangg Sep 14, 2023
eb39c03
Merge branch 'kinesis' of github.com:roger-zhangg/aws-lambda-powertoo…
roger-zhangg Sep 14, 2023
c039480
docs: give info upfront, name examples
heitorlessa Sep 14, 2023
e11c718
docs: improve transforming records example
heitorlessa Sep 14, 2023
745492c
docs: improve dropping records example
heitorlessa Sep 14, 2023
61ddee9
docs: improve exception example
heitorlessa Sep 14, 2023
6f95e8f
Merge branch 'develop' into kinesis
leandrodamascena Sep 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions aws_lambda_powertools/shared/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Internal shared functions. Do not rely on it besides internal usage."""
10 changes: 9 additions & 1 deletion aws_lambda_powertools/utilities/data_classes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@
from .event_bridge_event import EventBridgeEvent
from .event_source import event_source
from .kafka_event import KafkaEvent
from .kinesis_firehose_event import KinesisFirehoseEvent
from .kinesis_firehose_event import (
KinesisFirehoseDataTransformationRecord,
KinesisFirehoseDataTransformationRecordMetadata,
KinesisFirehoseDataTransformationResponse,
KinesisFirehoseEvent,
)
from .kinesis_stream_event import KinesisStreamEvent
from .lambda_function_url_event import LambdaFunctionUrlEvent
from .s3_event import S3Event, S3EventBridgeNotificationEvent
Expand All @@ -39,6 +44,9 @@
"KafkaEvent",
"KinesisFirehoseEvent",
"KinesisStreamEvent",
"KinesisFirehoseDataTransformationResponse",
"KinesisFirehoseDataTransformationRecord",
"KinesisFirehoseDataTransformationRecordMetadata",
"LambdaFunctionUrlEvent",
"S3Event",
"S3EventBridgeNotificationEvent",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,179 @@
import base64
from typing import Iterator, Optional
import json
import warnings
from dataclasses import dataclass, field
from typing import Any, Callable, ClassVar, Dict, Iterator, List, Optional, Tuple

from typing_extensions import Literal

from aws_lambda_powertools.utilities.data_classes.common import DictWrapper


@dataclass(repr=False, order=False, frozen=True)
class KinesisFirehoseDataTransformationRecordMetadata:
"""
Metadata in Firehose Data Transform Record.

Parameters
----------
partition_keys: Dict[str, str]
A dict of partition keys/value in string format, e.g. `{"year":"2023","month":"09"}`

Documentation:
roger-zhangg marked this conversation as resolved.
Show resolved Hide resolved
--------------
- https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
"""

partition_keys: Dict[str, str] = field(default_factory=lambda: {})

def asdict(self) -> Dict:
if self.partition_keys is not None:
return {"partitionKeys": self.partition_keys}
return {}


@dataclass(repr=False, order=False)
class KinesisFirehoseDataTransformationRecord:
"""Record in Kinesis Data Firehose response object.

Parameters
----------
record_id: str
uniquely identifies this record within the current batch
result: Literal["Ok", "Dropped", "ProcessingFailed"]
record data transformation status, whether it succeeded, should be dropped, or failed.
data: str
base64-encoded payload, by default empty string.

Use `data_from_text` or `data_from_json` methods to convert data if needed.

metadata: Optional[KinesisFirehoseDataTransformationRecordMetadata]
Metadata associated with this record; can contain partition keys.

See: https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
json_serializer: Callable
function to serialize `obj` to a JSON formatted `str`, by default json.dumps
json_deserializer: Callable
function to deserialize `str`, `bytes`, bytearray` containing a JSON document to a Python `obj`,
by default json.loads

Documentation:
--------------
- https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html
"""

_valid_result_types: ClassVar[Tuple[str, str, str]] = ("Ok", "Dropped", "ProcessingFailed")

record_id: str
result: Literal["Ok", "Dropped", "ProcessingFailed"] = "Ok"
data: str = ""
metadata: Optional[KinesisFirehoseDataTransformationRecordMetadata] = None
json_serializer: Callable = json.dumps
json_deserializer: Callable = json.loads
_json_data: Optional[Any] = None

def asdict(self) -> Dict:
if self.result not in self._valid_result_types:
warnings.warn(
stacklevel=1,
message=f'The result "{self.result}" is not valid, Choose from "Ok", "Dropped", "ProcessingFailed"',
)

record: Dict[str, Any] = {
"recordId": self.record_id,
"result": self.result,
"data": self.data,
}
if self.metadata:
record["metadata"] = self.metadata.asdict()
return record

@property
def data_as_bytes(self) -> bytes:
"""Decoded base64-encoded data as bytes"""
if not self.data:
return b""
return base64.b64decode(self.data)

@property
def data_as_text(self) -> str:
"""Decoded base64-encoded data as text"""
if not self.data:
return ""
return self.data_as_bytes.decode("utf-8")

@property
def data_as_json(self) -> Dict:
"""Decoded base64-encoded data loaded to json"""
if not self.data:
return {}
if self._json_data is None:
self._json_data = self.json_deserializer(self.data_as_text)
return self._json_data


@dataclass(repr=False, order=False)
class KinesisFirehoseDataTransformationResponse:
"""Kinesis Data Firehose response object

Documentation:
--------------
- https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html

Parameters
----------
records : List[KinesisFirehoseResponseRecord]
records of Kinesis Data Firehose response object,
optional parameter at start. can be added later using `add_record` function.

Examples
--------

**Transforming data records**

```python
from aws_lambda_powertools.utilities.data_classes import (
KinesisFirehoseDataTransformationRecord,
KinesisFirehoseDataTransformationResponse,
KinesisFirehoseEvent,
)
from aws_lambda_powertools.utilities.serialization import base64_from_json
from aws_lambda_powertools.utilities.typing import LambdaContext


def lambda_handler(event: dict, context: LambdaContext):
firehose_event = KinesisFirehoseEvent(event)
result = KinesisFirehoseDataTransformationResponse()

for record in firehose_event.records:
payload = record.data_as_text # base64 decoded data as str

## generate data to return
transformed_data = {"tool_used": "powertools_dataclass", "original_payload": payload}
processed_record = KinesisFirehoseDataTransformationRecord(
record_id=record.record_id,
data=base64_from_json(transformed_data),
)

result.add_record(processed_record)

# return transformed records
return result.asdict()
```
"""

records: List[KinesisFirehoseDataTransformationRecord] = field(default_factory=list)

def add_record(self, record: KinesisFirehoseDataTransformationRecord):
self.records.append(record)

def asdict(self) -> Dict:
if not self.records:
raise ValueError("Amazon Kinesis Data Firehose doesn't accept empty response")

return {"records": [record.asdict() for record in self.records]}


class KinesisFirehoseRecordMetadata(DictWrapper):
@property
def _metadata(self) -> dict:
Expand Down Expand Up @@ -77,6 +247,32 @@ def data_as_json(self) -> dict:
self._json_data = self._json_deserializer(self.data_as_text)
return self._json_data

def build_data_transformation_response(
self,
result: Literal["Ok", "Dropped", "ProcessingFailed"] = "Ok",
data: str = "",
metadata: Optional[KinesisFirehoseDataTransformationRecordMetadata] = None,
) -> KinesisFirehoseDataTransformationRecord:
"""Create a KinesisFirehoseResponseRecord directly using the record_id and given values

Parameters
----------
result : Literal["Ok", "Dropped", "ProcessingFailed"]
processing result, supported value: Ok, Dropped, ProcessingFailed
data : str, optional
data blob, base64-encoded, optional at init. Allows pass in base64-encoded data directly or
use either function like `data_from_text`, `data_from_json` to populate data
metadata: KinesisFirehoseResponseRecordMetadata, optional
Metadata associated with this record; can contain partition keys
- https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
"""
return KinesisFirehoseDataTransformationRecord(
record_id=self.record_id,
result=result,
data=data,
metadata=metadata,
)


class KinesisFirehoseEvent(DictWrapper):
"""Kinesis Data Firehose event
Expand Down
59 changes: 59 additions & 0 deletions aws_lambda_powertools/utilities/serialization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""Standalone functions to serialize/deserialize common data structures"""
import base64
import json
from typing import Any, Callable


def base64_encode(data: str) -> str:
"""Encode a string and returns Base64-encoded encoded value.

Parameters
----------
data: str
The string to encode.

Returns
-------
str
The Base64-encoded encoded value.
"""
return base64.b64encode(data.encode()).decode("utf-8")


def base64_decode(data: str) -> str:
"""Decodes a Base64-encoded string and returns the decoded value.

Parameters
----------
data: str
The Base64-encoded string to decode.

Returns
-------
str
The decoded string value.
"""
return base64.b64decode(data).decode("utf-8")


def base64_from_str(data: str) -> str:
"""Encode str as base64 string"""
return base64.b64encode(data.encode()).decode("utf-8")


def base64_from_json(data: Any, json_serializer: Callable[..., str] = json.dumps) -> str:
"""Encode JSON serializable data as base64 string

Parameters
----------
data: Any
JSON serializable (dict, list, boolean, etc.)
json_serializer: Callable
function to serialize `obj` to a JSON formatted `str`, by default json.dumps

Returns
-------
str:
JSON string as base64 string
"""
return base64_from_str(data=json_serializer(data))
33 changes: 27 additions & 6 deletions docs/utilities/data_classes.md
Original file line number Diff line number Diff line change
Expand Up @@ -975,18 +975,39 @@ or plain text, depending on the original payload.

### Kinesis Firehose delivery stream

Kinesis Firehose Data Transformation can use a Lambda Function to modify the records
inline, and re-emit them back to the Delivery Stream.
When using Kinesis Firehose, you can use a Lambda function to [perform data transformation](https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html){target="_blank"}. For each transformed record, you can choose to either:

Similar to Kinesis Data Streams, the events contain base64 encoded data. You can use the helper
function to access the data either as json or plain text, depending on the original payload.
* **A)** Put them back to the delivery stream (default)
* **B)** Drop them so consumers don't receive them (e.g., data validation)
* **C)** Indicate a record failed data transformation and should be retried

=== "app.py"
To do that, you can use `KinesisFirehoseDataTransformationResponse` class along with helper functions to make it easier to decode and encode base64 data in the stream.

```python
=== "Transforming streaming records"

```python hl_lines="2-3 12 28"
--8<-- "examples/event_sources/src/kinesis_firehose_delivery_stream.py"
```

1. **Ingesting JSON payloads?** <br><br> Use `record.data_as_json` to easily deserialize them.
2. For your convenience, `base64_from_json` serializes a dict to JSON, then encode as base64 data.

=== "Dropping invalid records"

```python hl_lines="5-6 16 34"
--8<-- "examples/event_sources/src/kinesis_firehose_response_drop.py"
```

1. This exception would be generated from `record.data_as_json` if invalid payload.

=== "Indicating a processing failure"

```python hl_lines="2-3 33"
--8<-- "examples/event_sources/src/kinesis_firehose_response_exception.py"
```

1. This record will now be sent to your [S3 bucket in the `processing-failed` folder](https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html#data-transformation-failure-handling){target="_blank"}.

### Lambda Function URL

=== "app.py"
Expand Down
26 changes: 13 additions & 13 deletions examples/event_sources/src/kinesis_firehose_delivery_stream.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
import base64
import json

from aws_lambda_powertools.utilities.data_classes import (
KinesisFirehoseDataTransformationResponse,
KinesisFirehoseEvent,
event_source,
)
from aws_lambda_powertools.utilities.serialization import base64_from_json
from aws_lambda_powertools.utilities.typing import LambdaContext


@event_source(data_class=KinesisFirehoseEvent)
def lambda_handler(event: KinesisFirehoseEvent, context: LambdaContext):
result = []
result = KinesisFirehoseDataTransformationResponse()

for record in event.records:
# if data was delivered as json; caches loaded value
data = record.data_as_json
# get original data using data_as_text property
data = record.data_as_text # (1)!

## generate data to return
transformed_data = {"new_data": "transformed data using Powertools", "original_payload": data}

processed_record = {
"recordId": record.record_id,
"data": base64.b64encode(json.dumps(data).encode("utf-8")),
"result": "Ok",
}
processed_record = record.build_data_transformation_response(
data=base64_from_json(transformed_data), # (2)!
)

result.append(processed_record)
result.add_record(processed_record)

# return transformed records
return {"records": result}
return result.asdict()
Loading