Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(idempotency): support methods with the same name (ABCs) by including fully qualified name in v2 #1535

Merged
2 changes: 1 addition & 1 deletion aws_lambda_powertools/utilities/idempotency/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def __init__(
self.fn_kwargs = function_kwargs
self.config = config

persistence_store.configure(config, self.function.__name__)
persistence_store.configure(config, f"{self.function.__module__}.{self.function.__qualname__}")
self.persistence_store = persistence_store

def handle(self) -> Any:
Expand Down
2 changes: 1 addition & 1 deletion docs/utilities/idempotency.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ If you're not [changing the default configuration for the DynamoDB persistence l
| TTL attribute name | `expiration` | This can only be configured after your table is created if you're using AWS Console |

???+ tip "Tip: You can share a single state table for all functions"
You can reuse the same DynamoDB table to store idempotency state. We add your `function_name` in addition to the idempotency key as a hash key.
You can reuse the same DynamoDB table to store idempotency state. We add `module_name` and [qualified name for classes and functions](https://peps.python.org/pep-3155/) in addition to the idempotency key as a hash key.

```yaml hl_lines="5-13 21-23" title="AWS Serverless Application Model (SAM) example"
Resources:
Expand Down
Empty file.
19 changes: 19 additions & 0 deletions tests/e2e/idempotency/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import pytest

from tests.e2e.idempotency.infrastructure import IdempotencyDynamoDBStack


@pytest.fixture(autouse=True, scope="module")
def infrastructure(tmp_path_factory, worker_id):
"""Setup and teardown logic for E2E test infrastructure

Yields
------
Dict[str, str]
CloudFormation Outputs from deployed infrastructure
"""
stack = IdempotencyDynamoDBStack()
try:
yield stack.deploy()
finally:
stack.delete()
13 changes: 13 additions & 0 deletions tests/e2e/idempotency/handlers/parallel_execution_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import time

from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer, idempotent

persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable")


@idempotent(persistence_store=persistence_layer)
def lambda_handler(event, context):

time.sleep(10)

return event
14 changes: 14 additions & 0 deletions tests/e2e/idempotency/handlers/ttl_cache_expiration_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import time

from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer, IdempotencyConfig, idempotent

persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable")
config = IdempotencyConfig(expires_after_seconds=20)


@idempotent(config=config, persistence_store=persistence_layer)
def lambda_handler(event, context):

time_now = time.time()

return {"time": str(time_now)}
15 changes: 15 additions & 0 deletions tests/e2e/idempotency/handlers/ttl_cache_timeout_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import time

from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer, IdempotencyConfig, idempotent

persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable")
config = IdempotencyConfig(expires_after_seconds=1)


@idempotent(config=config, persistence_store=persistence_layer)
def lambda_handler(event, context):

sleep_time: int = event.get("sleep") or 0
time.sleep(sleep_time)

return event
29 changes: 29 additions & 0 deletions tests/e2e/idempotency/infrastructure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from typing import Any

from aws_cdk import CfnOutput, RemovalPolicy
from aws_cdk import aws_dynamodb as dynamodb

from tests.e2e.utils.infrastructure import BaseInfrastructure


class IdempotencyDynamoDBStack(BaseInfrastructure):
def create_resources(self):
functions = self.create_lambda_functions()
self._create_dynamodb_table(function=functions)

def _create_dynamodb_table(self, function: Any):
table = dynamodb.Table(
self.stack,
"Idempotency",
table_name="IdempotencyTable",
removal_policy=RemovalPolicy.DESTROY,
partition_key=dynamodb.Attribute(name="id", type=dynamodb.AttributeType.STRING),
time_to_live_attribute="expiration",
billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,
)

table.grant_read_write_data(function["TtlCacheExpirationHandler"])
table.grant_read_write_data(function["TtlCacheTimeoutHandler"])
table.grant_read_write_data(function["ParallelExecutionHandler"])

CfnOutput(self.stack, "DynamoDBTable", value=table.table_name)
97 changes: 97 additions & 0 deletions tests/e2e/idempotency/test_idempotency_dynamodb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import json
from time import sleep

import pytest

from tests.e2e.utils import data_fetcher
from tests.e2e.utils.functions import execute_lambdas_in_parallel


@pytest.fixture
def ttl_cache_expiration_handler_fn_arn(infrastructure: dict) -> str:
return infrastructure.get("TtlCacheExpirationHandlerArn", "")


@pytest.fixture
def ttl_cache_timeout_handler_fn_arn(infrastructure: dict) -> str:
return infrastructure.get("TtlCacheTimeoutHandlerArn", "")


@pytest.fixture
def parallel_execution_handler_fn_arn(infrastructure: dict) -> str:
return infrastructure.get("ParallelExecutionHandlerArn", "")


@pytest.fixture
def idempotency_table_name(infrastructure: dict) -> str:
return infrastructure.get("DynamoDBTable", "")


def test_ttl_caching_expiration_idempotency(ttl_cache_expiration_handler_fn_arn: str):
# GIVEN
payload = json.dumps({"message": "Lambda Powertools - TTL 20s"})

# WHEN
# first execution
first_execution, _ = data_fetcher.get_lambda_response(
lambda_arn=ttl_cache_expiration_handler_fn_arn, payload=payload
)
first_execution_response = first_execution["Payload"].read().decode("utf-8")

# the second execution should return the same response as the first execution
second_execution, _ = data_fetcher.get_lambda_response(
lambda_arn=ttl_cache_expiration_handler_fn_arn, payload=payload
)
second_execution_response = second_execution["Payload"].read().decode("utf-8")

# wait 20s to expire ttl and execute again, this should return a new response value
sleep(20)
third_execution, _ = data_fetcher.get_lambda_response(
lambda_arn=ttl_cache_expiration_handler_fn_arn, payload=payload
)
third_execution_response = third_execution["Payload"].read().decode("utf-8")

# THEN
assert first_execution_response == second_execution_response
assert third_execution_response != second_execution_response


def test_ttl_caching_timeout_idempotency(ttl_cache_timeout_handler_fn_arn: str):
# GIVEN
payload_timeout_execution = json.dumps({"sleep": 10, "message": "Lambda Powertools - TTL 1s"})
payload_working_execution = json.dumps({"sleep": 0, "message": "Lambda Powertools - TTL 1s"})

# WHEN
# first call should fail due to timeout
execution_with_timeout, _ = data_fetcher.get_lambda_response(
lambda_arn=ttl_cache_timeout_handler_fn_arn, payload=payload_timeout_execution
)
execution_with_timeout_response = execution_with_timeout["Payload"].read().decode("utf-8")

# the second call should work and return the payload
execution_working, _ = data_fetcher.get_lambda_response(
lambda_arn=ttl_cache_timeout_handler_fn_arn, payload=payload_working_execution
)
execution_working_response = execution_working["Payload"].read().decode("utf-8")

# THEN
assert "Task timed out after" in execution_with_timeout_response
assert payload_working_execution == execution_working_response


def test_parallel_execution_idempotency(parallel_execution_handler_fn_arn: str):
# GIVEN
arguments = {"lambda_arn": parallel_execution_handler_fn_arn}

# WHEN
# executing Lambdas in parallel
execution_result_list = execute_lambdas_in_parallel(
[data_fetcher.get_lambda_response, data_fetcher.get_lambda_response], arguments
)

timeout_execution_response = execution_result_list[0][0]["Payload"].read().decode("utf-8")
error_idempotency_execution_response = execution_result_list[1][0]["Payload"].read().decode("utf-8")
leandrodamascena marked this conversation as resolved.
Show resolved Hide resolved

# THEN
assert "Execution already in progress with idempotency key" in error_idempotency_execution_response
assert "Task timed out after" in timeout_execution_response
1 change: 1 addition & 0 deletions tests/e2e/utils/data_fetcher/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from tests.e2e.utils.data_fetcher.common import get_http_response, get_lambda_response
from tests.e2e.utils.data_fetcher.idempotency import get_ddb_idempotency_record
from tests.e2e.utils.data_fetcher.logs import get_logs
from tests.e2e.utils.data_fetcher.metrics import get_metrics
from tests.e2e.utils.data_fetcher.traces import get_traces
39 changes: 39 additions & 0 deletions tests/e2e/utils/data_fetcher/idempotency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import boto3
from retry import retry


@retry(ValueError, delay=2, jitter=1.5, tries=10)
def get_ddb_idempotency_record(
function_name: str,
table_name: str,
) -> int:
"""_summary_

Parameters
----------
function_name : str
Name of Lambda function to fetch dynamodb record
table_name : str
Name of DynamoDB table

Returns
-------
int
Count of records found

Raises
------
ValueError
When no record is found within retry window
"""
ddb_client = boto3.resource("dynamodb")
table = ddb_client.Table(table_name)
ret = table.scan(
FilterExpression="contains (id, :functionName)",
ExpressionAttributeValues={":functionName": f"{function_name}#"},
)

if not ret["Items"]:
raise ValueError("Empty response from DynamoDB Repeating...")

return ret["Count"]
11 changes: 11 additions & 0 deletions tests/e2e/utils/functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from concurrent.futures import ThreadPoolExecutor


def execute_lambdas_in_parallel(tasks, arguments):
result_list = []
with ThreadPoolExecutor() as executor:
running_tasks = [executor.submit(task, **arguments) for task in tasks]
for running_task in running_tasks:
result_list.append(running_task.result())

return result_list
14 changes: 10 additions & 4 deletions tests/functional/idempotency/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,18 +172,24 @@ def expected_params_put_item_with_validation(hashed_idempotency_key, hashed_vali


@pytest.fixture
def hashed_idempotency_key(lambda_apigw_event, default_jmespath, lambda_context):
def hashed_idempotency_key(request, lambda_apigw_event, default_jmespath, lambda_context):
compiled_jmespath = jmespath.compile(default_jmespath)
data = compiled_jmespath.search(lambda_apigw_event)
return "test-func.lambda_handler#" + hash_idempotency_key(data)
return (
f"test-func.{request.function.__module__}.{request.function.__qualname__}.<locals>.lambda_handler#"
+ hash_idempotency_key(data)
)


@pytest.fixture
def hashed_idempotency_key_with_envelope(lambda_apigw_event):
def hashed_idempotency_key_with_envelope(request, lambda_apigw_event):
event = extract_data_from_envelope(
data=lambda_apigw_event, envelope=envelopes.API_GATEWAY_HTTP, jmespath_options={}
)
return "test-func.lambda_handler#" + hash_idempotency_key(event)
return (
f"test-func.{request.function.__module__}.{request.function.__qualname__}.<locals>.lambda_handler#"
+ hash_idempotency_key(event)
)


@pytest.fixture
Expand Down
Loading