Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature request: Kinesis Firehose Response Record data class #2440

Closed
2 tasks done
troyswanson opened this issue Jun 12, 2023 · 11 comments · Fixed by #3029
Closed
2 tasks done

Feature request: Kinesis Firehose Response Record data class #2440

troyswanson opened this issue Jun 12, 2023 · 11 comments · Fixed by #3029
Assignees
Labels
event_sources Event Source Data Class utility feature-request feature request

Comments

@troyswanson
Copy link
Contributor

troyswanson commented Jun 12, 2023

Use case

Constructing response objects for use in Kinesis Firehose transformation functions.

This is a continuation of #1059 which describes the event object as well as the response object. The implementation for that issue can be found at #1540, but that does not include the response object.

Solution/User Experience

A data class that can be populated during the execution of a function that will be properly formed as a response to a KinesisFirehoseEvent invocation.

Rough idea

KinesisFirehoseResponse:
   records: list[KinssisFirehoseResponseRecord]
KinesisFirehoseResponseRecord:
   record_id: str
   result: Literal["Ok", "ProcessingFailed"]
   data: bytes
   metadata: KinesisFirehoseResponseRecordMetadata
KinesisFirehoseResponseRecordMetadata:
   partition_keys: dict

Note: ☝🏼 I'm not sure if this is not an exhaustive list of options that can be returned

Alternative solutions

Previously, I've used basic dictionaries for this, but it would be nice to have a more structured data class to use.

The Go example in the Dynamic Partitioning in Kinesis Data Firehose has the concept of a KinesisFirehoseResponse in their events package.

I believe it would be possible to re-use the KinesisFirehoseEvent data class from the utilities.data_classes module, but this seems like it is more geared for the event invocation object as opposed to the response object.

Acknowledgment

@troyswanson troyswanson added feature-request feature request triage Pending triage from maintainers labels Jun 12, 2023
@boring-cyborg
Copy link

boring-cyborg bot commented Jun 12, 2023

Thanks for opening your first issue here! We'll come back to you as soon as we can.
In the meantime, check out the #python channel on our Powertools for AWS Lambda Discord: Invite link

@troyswanson
Copy link
Contributor Author

troyswanson commented Jun 13, 2023

Been playing with a custom implementation for my project. Thought I would share it here:

(This uses Python 3.9 so the type alias syntax is a little different than current.)

myproj/dataclasses/kinesis_firehose.py

from typing import Union, Optional, Callable
from dataclasses import dataclass
from base64 import standard_b64encode

from aws_lambda_powertools.utilities.data_classes import KinesisFirehoseEvent


KinesisFirehoseResponseRecord = Union[
    "KinesisFirehoseResponseRecordOk",
    "KinesisFirehoseResponseRecordDropped",
    "KinesisFirehoseResponseRecordFailed",
]


@dataclass
class KinesisFirehoseEventProcessor:

    event: KinesisFirehoseEvent

    def process(self, fn: Callable[..., "KinesisFirehoseProcessedRecord"]):

        response_records: list[KinesisFirehoseResponseRecord] = list()

        for record in self.event.records:
            try:
                processed_record = fn(
                    record=record,
                    invocation_id=self.event.invocation_id,
                    delivery_stream_arn=self.event.delivery_stream_arn,
                    source_kinesis_stream_arn=self.event.source_kinesis_stream_arn,
                    region=self.event.region,
                )
                response_record = KinesisFirehoseResponseRecordOk(
                    record_id=record.record_id,
                    data=processed_record.data,
                    metadata=processed_record.metadata,
                )
            except KinesisFirehoseRecordProcessingDropped:
                response_record = KinesisFirehoseResponseRecordDropped(
                    record_id=record.record_id
                )
            except KinesisFirehoseRecordProcessingFailed:
                response_record = KinesisFirehoseResponseRecordFailed(
                    record_id=record.record_id
                )

            response_records.append(response_record)

        return KinesisFirehoseResponse(records=response_records)


@dataclass
class KinesisFirehoseProcessedRecord:
    data: str
    metadata: Optional["KinesisFirehoseResponseRecordMetadata"] = None


@dataclass
class KinesisFirehoseResponse:
    records: list["KinesisFirehoseResponseRecord"]

    def to_dict(self):
        return {"records": [record.to_dict() for record in self.records]}


@dataclass
class KinesisFirehoseResponseRecordMetadata:
    partition_keys: Optional[dict[str, str]]

    def to_dict(self):

        r = dict()

        if self.partition_keys is not None:
            r["partitionKeys"] = self.partition_keys

        return r


@dataclass
class KinesisFirehoseResponseRecordOk:
    record_id: str
    data: str
    metadata: Optional[KinesisFirehoseResponseRecordMetadata] = None

    @property
    def data_b64encoded(self) -> bytes:
        return standard_b64encode(self.data.encode())

    def to_dict(self):

        r = {
            "recordId": self.record_id,
            "result": "Ok",
            "data": self.data_b64encoded,
            "metadata": dict(),
        }

        if self.metadata is not None:
            r["metadata"] = self.metadata.to_dict()

        return r


@dataclass
class KinesisFirehoseResponseRecordFailed:
    record_id: str

    def to_dict(self):
        return {"recordId": self.record_id, "result": "ProcessingFailed"}


@dataclass
class KinesisFirehoseResponseRecordDropped:
    record_id: str

    def to_dict(self):
        return {"recordId": self.record_id, "result": "Dropped"}


class KinesisFirehoseRecordProcessingFailed(Exception):
    ...


class KinesisFirehoseRecordProcessingDropped(Exception):
    ...

Example implementation:

tests/conftest.py

import pytest
from aws_lambda_powertools.utilities.data_classes import KinesisFirehoseEvent


@pytest.fixture
def kinesis_firehose_event() -> KinesisFirehoseEvent:
    """
    record1: {"text":"hello world"}
    record2: {"text":"foo bar"}
    """

    return KinesisFirehoseEvent(
        {
            "invocationId": "invoked123",
            "deliveryStreamArn": "aws:lambda:events",
            "region": "us-west-2",
            "records": [
                {
                    "data": "eyJ0ZXh0IjoiaGVsbG8gd29ybGQifQ==",
                    "recordId": "record1",
                    "approximateArrivalTimestamp": 1686589530000,
                    "kinesisRecordMetadata": {
                        "shardId": "shardId-000000000000",
                        "partitionKey": "4d1ad2b9-2 4f8-4b9d-a088-76e9947c317a",
                        "approximateArrivalTimestamp": "2023-06-12T17:05:30.000Z",
                        "sequenceNumber": "49546986683135544286507457936321625675700192471156785154",  # noqa: E501
                        "subsequenceNumber": "",
                    },
                },
                {
                    "data": "eyJ0ZXh0IjoiZm9vIGJhciJ9",
                    "recordId": "record2",
                    "approximateArrivalTimestamp": 1686589530000,
                    "kinesisRecordMetadata": {
                        "shardId": "shardId-000000000001",
                        "partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c318a",
                        "approximateArrivalTimestamp": "2023-06-12T17:05:30.000Z",
                        "sequenceNumber": "49546986683135544286507457936321625675700192471156785155",  # noqa: E501
                        "subsequenceNumber": "",
                    },
                },
            ],
        }
    )

tests/test_kinesis_firehose.py

from json import dumps

from aws_lambda_powertools.utilities.data_classes.kinesis_firehose_event import (
    KinesisFirehoseEvent,
    KinesisFirehoseRecord,
)

from myproj.dataclasses.kinesis_firehose import (
    KinesisFirehoseEventProcessor,
    KinesisFirehoseProcessedRecord,
    KinesisFirehoseResponseRecordOk,
    KinesisFirehoseResponseRecordFailed,
    KinesisFirehoseProcessingFailed,
)


def test_kinesis_firehose_processor(kinesis_firehose_event: KinesisFirehoseEvent):
    def fn(record: KinesisFirehoseRecord, **kwargs) -> KinesisFirehoseProcessedRecord:
        data = record.data_as_json.copy()
        data["len"] = len(data["text"])
        data_as_json = dumps(data, separators=(",", ":"))
        return KinesisFirehoseProcessedRecord(data=data_as_json)

    processor = KinesisFirehoseEventProcessor(kinesis_firehose_event)
    response = processor.process(fn)

    assert isinstance(response.records[0], KinesisFirehoseResponseRecordOk)
    assert response.records[0].record_id == "record1"
    assert response.records[0].data == '{"text":"hello world","len":11}'
    assert isinstance(response.records[1], KinesisFirehoseResponseRecordOk)
    assert response.records[1].record_id == "record2"
    assert response.records[1].data == '{"text":"foo bar","len":7}'

@rubenfonseca
Copy link
Contributor

Hi @troyswanson thank you for opening this! Since the response object can be quite complex, I agree that we could benefit with adding those classes to our dataclasses.

For reference, here's the Go types (https://github.com/aws/aws-lambda-go/blob/main/events/firehose.go#L28-L49)

I can see that you already have some code too. I would love if you could submit a PR for this! What do you think?

@rubenfonseca rubenfonseca added event_sources Event Source Data Class utility and removed triage Pending triage from maintainers labels Jun 15, 2023
@rubenfonseca rubenfonseca self-assigned this Jun 15, 2023
@sthulb sthulb moved this from Triage to Working on it in Powertools for AWS Lambda (Python) Jun 19, 2023
@leandrodamascena leandrodamascena moved this from Working on it to Pending customer in Powertools for AWS Lambda (Python) Jun 20, 2023
@heitorlessa heitorlessa added the help wanted Could use a second pair of eyes/hands label Jul 10, 2023
@heitorlessa heitorlessa moved this from Pending customer to Backlog in Powertools for AWS Lambda (Python) Jul 10, 2023
@troyswanson
Copy link
Contributor Author

@rubenfonseca I can check with my company to see if I can get some time to contrib this back as an official PR. In the meantime, if someone else is able to essentially copy/paste the code that I added to this issue, I would be fine with that too!

@leandrodamascena
Copy link
Contributor

Hi @troyswanson! We'll be adding this new class to our EventSource utility, and when we release it, we'll give you credit in our release notes for your great job in sending us nearly finished code.

Thank you so much

@leandrodamascena leandrodamascena self-assigned this Aug 17, 2023
@leandrodamascena leandrodamascena removed the help wanted Could use a second pair of eyes/hands label Aug 17, 2023
@troyswanson
Copy link
Contributor Author

@leandrodamascena Cool, thank you!

The code that I included has a batch processor built in, but it would probably be more appropriate to add that functionality into the https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/ tooling that already exists.

I admittedly don't have experience with that part of the Powertools library.

Anyways, I'm excited to see your implementation!

@roger-zhangg
Copy link
Member

roger-zhangg commented Aug 24, 2023

Hi @troyswanson
Thanks for your suggestion. We understand that you are using Kinesis Firehose to achieve batch processing. However, Kinesis Firehose is not currently supported in batch processing because it doesn't have batch failure pipe as SQS, Kinesis Datastream does.

If you think it would be helpful, we would like to see you submit a RFC specifically for supporting Kinesis Firehose in batch processing. This would allow us to focus on the main issue in this feature request, which is supporting the Kinesis Firehose Response Record data class.

Leandro assigned this task to me and I'm working to add the support, Current target is to submit a pull request by the end of next week. I'll ping you when there an update and you can review before we merge it. We appreciate your feedback and we will keep you updated on our progress.

@troyswanson
Copy link
Contributor Author

@roger-zhangg Thanks for the update!

@roger-zhangg
Copy link
Member

Hey @troyswanson I'm excited to share the PR I've opened #3029. We would love to see you review it and share your comments. Thanks!

@github-actions
Copy link
Contributor

⚠️COMMENT VISIBILITY WARNING⚠️

This issue is now closed. Please be mindful that future comments are hard for our team to see.

If you need more assistance, please either tag a team member or open a new issue that references this one.

If you wish to keep having a conversation with other community members under this issue feel free to do so.

@github-actions github-actions bot added the pending-release Fix or implementation already in dev waiting to be released label Sep 14, 2023
@github-actions
Copy link
Contributor

This is now released under 2.25.0 version!

@github-actions github-actions bot removed the pending-release Fix or implementation already in dev waiting to be released label Sep 15, 2023
@heitorlessa heitorlessa moved this from Coming soon to Shipped in Powertools for AWS Lambda (Python) Sep 18, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
event_sources Event Source Data Class utility feature-request feature request
Projects
Status: Shipped
5 participants